# This module contains only the Broadcaster object, encapsulating the state of
# one Channel Access UDP connection, intended to be used as a companion to a
# UDP socket provided by a client or server implementation.
import logging

from ._commands import (Beacon, RepeaterConfirmResponse,
                        RepeaterRegisterRequest, SearchRequest, SearchResponse,
from ._constants import DEFAULT_PROTOCOL_VERSION
from ._utils import CLIENT, SERVER, CaprotoValueError, RemoteProtocolError

__all__ = ('Broadcaster',)

[docs]class Broadcaster: """ An object encapsulating the state of one CA UDP connection. It is a companion to a UDP socket managed by a client or server implementation. All data received over the socket should be passed to :meth:`recv`. Any data sent over the socket should first be passed through :meth:`send`. Parameters ---------- our_role : CLIENT or SERVER protocol_version : integer Default is ``DEFAULT_PROTOCOL_VERSION``. """ def __init__(self, our_role, protocol_version=DEFAULT_PROTOCOL_VERSION): if our_role not in (SERVER, CLIENT): raise CaprotoValueError("role must be caproto.SERVER or " "caproto.CLIENT") self.our_role = our_role if our_role is CLIENT: self.their_role = SERVER else: self.their_role = CLIENT # Whereas VirtualCircuit has one client address and one server address, # the Broadcaster has multiple addresses on the server side (one per # interface that it listens on) and one on the client side. # We also provide the properties `our_addresses` and `their_addresses`, # whose meaning depends on our_role. Whichever one corresponds to the # client role will have a length of one. self.server_addresses = [] self.client_address = None self.protocol_version = protocol_version # Unlike VirtualCircuit and Channel, there is very little state to # track for the Broadcaster. We don't need a full state machine, just # one flag to check whether we have yet registered with a repeater. self._registered = False self.log = logging.getLogger("caproto.bcast") self.beacon_log = logging.getLogger('caproto.bcast.beacon') self.search_log = logging.getLogger('') @property def our_addresses(self): if self.our_role is CLIENT: return [self.client_address] # always return a list else: return self.server_addresses @property def their_addresses(self): if self.their_role is CLIENT: return [self.client_address] # always return a list else: return self.server_addresses
[docs] def send(self, *commands): """ Convert one or more high-level Commands into bytes that may be broadcast together in one UDP datagram. Update our internal state machine. Parameters ---------- *commands : any number of :class:`Message` objects Returns ------- bytes_to_send : bytes bytes to send over a socket """ bytes_to_send = b'' total_commands = len(commands) tags = {'role': repr(self.our_role)} for i, command in enumerate(commands): tags['counter'] = (1 + i, total_commands) if isinstance(command, (SearchRequest, SearchResponse)): self.search_log.debug("%r", command, extra=tags) else: self.log.debug("%r", command, extra=tags) self._process_command(self.our_role, command) bytes_to_send += bytes(command) return bytes_to_send
[docs] def recv(self, byteslike, address): """ Parse commands from a UDP datagram. When the caller is ready to process the commands, each command should first be passed to :meth:`Broadcaster.process_command` to validate it against the protocol and update the Broadcaster's state. Parameters ---------- byteslike : bytes-like address : tuple ``(host, port)`` as a string and an integer respectively Returns ------- commands : list """ try: commands = read_datagram(byteslike, address, self.their_role) except Exception as ex: raise RemoteProtocolError(f'Broadcaster malformed packet received:' f' {ex.__class__.__name__} {ex}') from ex tags = {'their_address': address, 'direction': '<<<---', 'role': repr(self.our_role)} for command in commands: tags['bytesize'] = len(command) for address in self.our_addresses: tags['our_address'] = address if isinstance(command, Beacon): log = self.beacon_log else: log = self.log log.debug("%r", command, extra=tags) return commands
[docs] def process_commands(self, commands): """ Update internal state machine and raise if protocol is violated. Received commands should be passed through here before any additional processing by a server or client layer. """ for command in commands: self._process_command(self.their_role, command)
def _process_command(self, role, command): """ All comands go through here. Parameters ---------- role : ``CLIENT`` or ``SERVER`` command : Message history : list This input will be mutated: command will be appended at the end. """ if isinstance(command, RepeaterConfirmResponse): self._registered = True # CONVENIENCE METHODS
[docs] def register(self, ip=''): """ Generate a valid :class:`RepeaterRegisterRequest`. Parameters ---------- ip : string, optional Our IP address. Defaults is '', which ends up being converted by the repeater to the IP from which it receives the packet. Returns ------- RepeaterRegisterRequest """ if ip is None: ip = '' command = RepeaterRegisterRequest(ip) return command
def disconnect(self): self._registered = False @property def registered(self): return self._registered