Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .utils import *
39 changes: 39 additions & 0 deletions examples/cameraDiscoveryExample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from context_logger import get_logger, setup_logging

from examples import setup_shutdown
from hello import Hello, Group, ServiceQuery, DiscoveryEvent

setup_logging('hello')

log = get_logger('CameraDiscovery')


def main() -> None:
shutdown_event = setup_shutdown()

# Define the group to discover camera services
group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555')

# Define the query to discover camera services
query = ServiceQuery(name='.+', role='camera')

# Use a discoverer to find camera services
with Hello.builder().discoverer().default() as discoverer:
# Define an event handler to process discovery events
def process_event(event: DiscoveryEvent) -> None:
log.info('Service discovery event', type=event.type.name, service=event.service)

# Register the event handler to process discovery events
discoverer.register(process_event)

# Start the discoverer with the specified group
discoverer.start(group)

# Send the service query
discoverer.discover(query)

shutdown_event.wait()


if __name__ == '__main__':
main()
38 changes: 38 additions & 0 deletions examples/cameraServiceExample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from context_logger import get_logger, setup_logging

from examples import setup_shutdown
from hello import ServiceInfo, Hello, Group

setup_logging('hello')

log = get_logger('CameraService')


def main() -> None:
shutdown_event = setup_shutdown()

# Define the group to advertise the camera service
group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555')

# Define the service information for the camera
info = ServiceInfo(name='er-sniper-camera-1', role='camera', urls={
'device-api': 'grpc://er-sniper-camera-1/device',
'video-stream': 'blob:http://er-sniper-camera-1/video'
})

# Use a scheduled advertizer to periodically announce the camera service
with Hello.builder().advertizer().scheduled() as advertizer:
# Start the advertizer with the specified group
advertizer.start(group)

# Immediately advertise the service information
advertizer.advertise(info)

# Schedule periodic advertisements every 10 seconds
advertizer.schedule(interval=10)

shutdown_event.wait()


if __name__ == '__main__':
main()
15 changes: 15 additions & 0 deletions examples/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from signal import signal, SIGINT, SIGTERM
from threading import Event
from typing import Any


def setup_shutdown() -> Event:
shutdown_event = Event()

def handler(signum: int, frame: Any) -> None:
shutdown_event.set()

signal(SIGINT, handler)
signal(SIGTERM, handler)

return shutdown_event
1 change: 1 addition & 0 deletions hello/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .group import *
from .sender import *
from .receiver import *
from .scheduler import *
from .service import *
from .advertizer import *
from .discoverer import *
Expand Down
43 changes: 14 additions & 29 deletions hello/advertizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from common_utility import IReusableTimer
from context_logger import get_logger

from hello import ServiceInfo, Group, Sender, GroupAccess, Receiver, ServiceMatcher, ServiceQuery
from hello import ServiceInfo, Group, Sender, Receiver, ServiceMatcher, ServiceQuery, DefaultScheduler

log = get_logger('Advertizer')


class Advertizer:

def start(self, address: str, group: Group, info: ServiceInfo | None = None) -> None:
def start(self, group: Group, info: ServiceInfo | None = None) -> None:
raise NotImplementedError()

def stop(self) -> None:
Expand All @@ -35,8 +35,8 @@ def __enter__(self) -> Advertizer:
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, address: str, group: Group, info: ServiceInfo | None = None) -> None:
self._sender.start(GroupAccess(address, group.hello()))
def start(self, group: Group, info: ServiceInfo | None = None) -> None:
self._sender.start(group.hello())
self._group = group
self._info = info

Expand All @@ -62,9 +62,9 @@ def __init__(self, sender: Sender, receiver: Receiver, max_response_delay: float
self._receiver = receiver
self._max_delay = max_response_delay

def start(self, address: str, group: Group, info: ServiceInfo | None = None) -> None:
super().start(address, group, info)
self._receiver.start(GroupAccess(address, group.query()))
def start(self, group: Group, info: ServiceInfo | None = None) -> None:
super().start(group, info)
self._receiver.start(group.query())
self._receiver.register(self._handle_message)

def stop(self) -> None:
Expand All @@ -89,42 +89,27 @@ def _handle_query(self, query: ServiceQuery, info: ServiceInfo) -> None:
self.advertise(info)


class ScheduledAdvertizer(Advertizer):

def schedule(self, info: ServiceInfo | None = None, interval: float = 10, one_shot: bool = False) -> None:
raise NotImplementedError()


class DefaultScheduledAdvertizer(ScheduledAdvertizer):
class ScheduledAdvertizer(DefaultScheduler[ServiceInfo], Advertizer):

def __init__(self, advertizer: Advertizer, timer: IReusableTimer) -> None:
super().__init__(timer)
self._advertizer = advertizer
self._timer = timer

def __enter__(self) -> ScheduledAdvertizer:
def __enter__(self) -> 'ScheduledAdvertizer':
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, address: str, group: Group, info: ServiceInfo | None = None) -> None:
self._advertizer.start(address, group, info)
def start(self, group: Group, info: ServiceInfo | None = None) -> None:
self._advertizer.start(group, info)

def stop(self) -> None:
self._timer.cancel()
super().stop()
self._advertizer.stop()

def advertise(self, info: ServiceInfo | None = None) -> None:
self._advertizer.advertise(info)

def schedule(self, info: ServiceInfo | None = None, interval: float = 60, one_shot: bool = False) -> None:
if one_shot:
self._timer.start(interval, self.advertise, [info])
log.info('One-shot service advertisement scheduled', service=info, interval=interval)
else:
self._timer.start(interval, self._advertise_and_restart, [info])
log.info('Periodic service advertisement scheduled', service=info, interval=interval)

def _advertise_and_restart(self, info: ServiceInfo | None = None) -> None:
def _execute(self, info: ServiceInfo | None = None) -> None:
self.advertise(info)
self._timer.restart()
97 changes: 69 additions & 28 deletions hello/api.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,85 @@
from dataclasses import dataclass
from typing import Any

from common_utility import ReusableTimer
from zmq import Context

from hello import Advertizer, Discoverer, RadioSender, DishReceiver, DefaultAdvertizer, DefaultDiscoverer, \
ScheduledAdvertizer, RespondingAdvertizer, DefaultScheduledAdvertizer
from hello import RadioSender, DishReceiver, DefaultAdvertizer, DefaultDiscoverer, \
RespondingAdvertizer, ScheduledAdvertizer, ScheduledDiscoverer


class Hello:
@dataclass
class HelloConfig:
context: Context[Any] = Context()
receiver_max_workers: int = 1
receiver_poll_timeout: float = 0.1
advertizer_responder: bool = True
advertizer_max_delay: float = 0.1

def default_advertizer(self, respond: bool = True, delay: float = 0.1) -> Advertizer:
raise NotImplementedError()

def scheduled_advertizer(self, respond: bool = True, delay: float = 0.1) -> ScheduledAdvertizer:
raise NotImplementedError()
class Hello(object):

def discoverer(self) -> Discoverer:
raise NotImplementedError()
@classmethod
def default_advertizer(cls, config: HelloConfig) -> DefaultAdvertizer:
sender = RadioSender(config.context)
if config.advertizer_responder:
receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout)
return RespondingAdvertizer(sender, receiver, config.advertizer_max_delay)
else:
return DefaultAdvertizer(sender)

@classmethod
def scheduled_advertizer(cls, config: HelloConfig) -> ScheduledAdvertizer:
advertizer = cls.default_advertizer(config)
return ScheduledAdvertizer(advertizer, ReusableTimer())

class DefaultHello(Hello):
@classmethod
def default_discoverer(cls, config: HelloConfig) -> DefaultDiscoverer:
sender = RadioSender(config.context)
receiver = DishReceiver(config.context, config.receiver_max_workers, config.receiver_poll_timeout)
return DefaultDiscoverer(sender, receiver)

def __init__(self, context: Context[Any] | None = None, max_workers: int = 1, poll_timeout: float = 0.1) -> None:
self._context = context if context else Context()
self._max_workers = max_workers
self._poll_timeout = poll_timeout
@classmethod
def scheduled_discoverer(cls, config: HelloConfig) -> ScheduledDiscoverer:
discoverer = cls.default_discoverer(config)
return ScheduledDiscoverer(discoverer, ReusableTimer())

def default_advertizer(self, respond: bool = True, delay: float = 0.1) -> Advertizer:
sender = RadioSender(self._context)
if respond:
receiver = DishReceiver(self._context, self._max_workers, self._poll_timeout)
return RespondingAdvertizer(sender, receiver, delay)
else:
return DefaultAdvertizer(sender)
@classmethod
def builder(cls, config: HelloConfig | None = None) -> 'HelloBuilder':
return HelloBuilder(config if config else HelloConfig())

def scheduled_advertizer(self, respond: bool = True, delay: float = 0.1) -> ScheduledAdvertizer:
advertizer = self.default_advertizer(respond, delay)
return DefaultScheduledAdvertizer(advertizer, ReusableTimer())

def discoverer(self) -> Discoverer:
sender = RadioSender(self._context)
receiver = DishReceiver(self._context, self._max_workers, self._poll_timeout)
return DefaultDiscoverer(sender, receiver)
class AdvertizerBuilder(object):

def __init__(self, config: HelloConfig) -> None:
self._config = config

def default(self) -> DefaultAdvertizer:
return Hello.default_advertizer(self._config)

def scheduled(self) -> ScheduledAdvertizer:
return Hello.scheduled_advertizer(self._config)


class DiscovererBuilder(object):

def __init__(self, config: HelloConfig) -> None:
self._config = config

def default(self) -> DefaultDiscoverer:
return Hello.default_discoverer(self._config)

def scheduled(self) -> ScheduledDiscoverer:
return Hello.scheduled_discoverer(self._config)


class HelloBuilder(object):

def __init__(self, config: HelloConfig) -> None:
self._config = config

def advertizer(self) -> AdvertizerBuilder:
return AdvertizerBuilder(self._config)

def discoverer(self) -> DiscovererBuilder:
return DiscovererBuilder(self._config)
49 changes: 44 additions & 5 deletions hello/discoverer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from enum import Enum
from typing import Any, Protocol

from common_utility import IReusableTimer
from context_logger import get_logger

from hello import Group, ServiceQuery, Sender, Receiver, GroupAccess, ServiceInfo, ServiceMatcher
from hello import Group, ServiceQuery, Sender, Receiver, ServiceInfo, ServiceMatcher, DefaultScheduler

log = get_logger('Discoverer')

Expand All @@ -26,7 +27,7 @@ def __call__(self, event: DiscoveryEvent) -> None: ...

class Discoverer:

def start(self, address: str, group: Group, query: ServiceQuery | None = None) -> None:
def start(self, group: Group, query: ServiceQuery | None = None) -> None:
raise NotImplementedError()

def stop(self) -> None:
Expand Down Expand Up @@ -64,13 +65,13 @@ def __enter__(self) -> Discoverer:
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, address: str, group: Group, query: ServiceQuery | None = None) -> None:
def start(self, group: Group, query: ServiceQuery | None = None) -> None:
self._group = group
if query:
self._matcher = ServiceMatcher(query)
self._sender.start(GroupAccess(address, group.query()))
self._sender.start(group.query())
self._receiver.register(self._handle_message)
self._receiver.start(GroupAccess(address, group.hello()))
self._receiver.start(group.hello())

def stop(self) -> None:
self._group = None
Expand Down Expand Up @@ -132,3 +133,41 @@ def _handle_event(self, event: DiscoveryEvent) -> None:
callback(event)
except Exception as error:
log.warn('Error in event handler execution', event=event, error=error)


class ScheduledDiscoverer(DefaultScheduler[ServiceQuery], Discoverer):

def __init__(self, discoverer: Discoverer, timer: IReusableTimer) -> None:
super().__init__(timer)
self._discoverer = discoverer

def __enter__(self) -> 'ScheduledDiscoverer':
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.stop()

def start(self, group: Group, query: ServiceQuery | None = None) -> None:
self._discoverer.start(group, query)

def stop(self) -> None:
super().stop()
self._discoverer.stop()

def discover(self, query: ServiceQuery | None = None) -> None:
self._discoverer.discover(query)

def get_services(self) -> dict[str, ServiceInfo]:
return self._discoverer.get_services()

def register(self, handler: OnDiscoveryEvent) -> None:
self._discoverer.register(handler)

def deregister(self, handler: OnDiscoveryEvent) -> None:
self._discoverer.deregister(handler)

def get_handlers(self) -> list[OnDiscoveryEvent]:
return self._discoverer.get_handlers()

def _execute(self, query: ServiceQuery | None = None) -> None:
self.discover(query)
Loading
Loading