Source code for caproto._broadcaster

# This module contains only the Broadcaster object, encapsulating the state of
# one Channel Access UDP connection, intended to be used as a companion to a
# UDP socket provided by a client or server implementation.
import itertools
import logging

from ._constants import (DEFAULT_PROTOCOL_VERSION, MAX_ID)
from ._utils import (CLIENT, SERVER, CaprotoValueError, LocalProtocolError)
from ._state import get_exception
from ._commands import (RepeaterConfirmResponse, RepeaterRegisterRequest,
                        SearchRequest, SearchResponse, VersionRequest,
                        read_datagram,
                        )


__all__ = ('Broadcaster',)


[docs]class Broadcaster: """ An object encapsulating the state of one CA UDP connection. It is a companion to a UDP socket managed by a client or server implementation. All data received over the socket should be passed to :meth:`recv`. Any data sent over the socket should first be passed through :meth:`send`. Parameters ---------- our_role : CLIENT or SERVER protocol_version : integer Default is ``DEFAULT_PROTOCOL_VERSION``. """ def __init__(self, our_role, protocol_version=DEFAULT_PROTOCOL_VERSION): if our_role not in (SERVER, CLIENT): raise CaprotoValueError("role must be caproto.SERVER or " "caproto.CLIENT") self.our_role = our_role if our_role is CLIENT: self.their_role = SERVER else: self.their_role = CLIENT self.protocol_version = protocol_version self.unanswered_searches = {} # map search id (cid) to name # Unlike VirtualCircuit and Channel, there is very little state to # track for the Broadcaster. We don't need a full state machine, just # one flag to check whether we have yet registered with a repeater. self._registered = False self._attempted_registration = False self._search_id_counter = itertools.count(0) self.log = logging.getLogger(f"caproto.bcast")
[docs] def send(self, *commands): """ Convert one or more high-level Commands into bytes that may be broadcast together in one UDP datagram. Update our internal state machine. Parameters ---------- *commands : any number of :class:`Message` objects Returns ------- bytes_to_send : bytes bytes to send over a socket """ bytes_to_send = b'' self.log.debug("Serializing %d commands into one datagram", len(commands)) history = [] for i, command in enumerate(commands): self.log.debug("%d of %d %r", 1 + i, len(commands), command) self._process_command(self.our_role, command, history=history) bytes_to_send += bytes(command) return bytes_to_send
[docs] def recv(self, byteslike, address): """ Parse commands from a UDP datagram. When the caller is ready to process the commands, each command should first be passed to :meth:`Broadcaster.process_command` to validate it against the protocol and update the Broadcaster's state. Parameters ---------- byteslike : bytes-like address : tuple ``(host, port)`` as a string and an integer respectively Returns ------- commands : list """ self.log.debug("Received datagram from %r with %d bytes.", address, len(byteslike)) commands = read_datagram(byteslike, address, self.their_role) for command in commands: self.log.debug("%d bytes -> %r", len(command), command) return commands
[docs] def process_commands(self, commands): """ Update internal state machine and raise if protocol is violated. Received commands should be passed through here before any additional processing by a server or client layer. """ history = [] for command in commands: self._process_command(self.their_role, command, history)
def _process_command(self, role, command, history): """ All comands go through here. Parameters ---------- role : ``CLIENT`` or ``SERVER`` command : Message history : list This input will be mutated: command will be appended at the end. """ # All commands go through here. if isinstance(command, RepeaterRegisterRequest): self._attempted_registration = True elif isinstance(command, RepeaterConfirmResponse): self._registered = True elif (role is CLIENT and self.our_role is CLIENT and not self._attempted_registration): raise LocalProtocolError("Client must send a " "RegisterRepeaterRequest before any " "other commands (saw: {})" "".format(type(command).__name__)) elif isinstance(command, SearchRequest): if VersionRequest not in map(type, history): err = get_exception(self.our_role, command) raise err("A broadcasted SearchResponse must be preceded by a " "VersionResponse in the same datagram.") self.unanswered_searches[command.cid] = command.name elif isinstance(command, SearchResponse): # TODO Do all versions of Rsrv respect this? Unclear why softIoc # seems to sometimes violate this part of the protocol. # if VersionResponse not in map(type, history): # err = get_exception(self.our_role, command) # raise err("A broadcasted SearchResponse must be preceded by " # "a VersionResponse in the same datagram.") self.unanswered_searches.pop(command.cid, None) history.append(command) # CONVENIENCE METHODS
[docs] def new_search_id(self): # Return the next sequential unused id. Wrap back to 0 on overflow. while True: i = next(self._search_id_counter) if i in self.unanswered_searches: continue if i == MAX_ID: self._search_id_counter = itertools.count(0) continue return i
[docs] def search(self, name, *, cid=None): """ Generate a valid :class:`VersionRequest` and :class:`SearchRequest`. The protocol requires that these be transmitted together as part of one datagram. Parameters ---------- name : string Channnel name (PV) Returns ------- (VersionRequest, SearchRequest) """ if cid is None: # TODO all client implementations want to handle cids on their own. cid = self.new_search_id() commands = (VersionRequest(0, self.protocol_version), SearchRequest(name, cid, self.protocol_version)) return commands
[docs] def register(self, ip='0.0.0.0'): """ Generate a valid :class:`RepeaterRegisterRequest`. Parameters ---------- ip : string, optional Our IP address. Defaults is '0.0.0.0', which ends up being converted by the repeater to the IP from which it receives the packet. Returns ------- RepeaterRegisterRequest """ if ip is None: ip = '0.0.0.0' command = RepeaterRegisterRequest(ip) # NOTE: consider this enough for a registration attempt. # TODO this is required for moving forward with the threading client, # and may be implemented better in the future self._attempted_registration = True return command
def disconnect(self): self._registered = False self._attempted_registration = False @property def registered(self): return self._registered @property def attempted_registration(self): return self._attempted_registration