Source code for caproto.pva._circuit

# A connection in caproto V3 is referred to as a VirtualCircuit. Let's just
# copy that nomenclature for now.
import logging
import typing
from typing import Dict, List, Optional, Sequence, Tuple, Type, Union

from .. import pva
from .._log import ComposableLogAdapter
from .._utils import ThreadsafeCounter
from ._core import SYS_ENDIAN
from ._data import FieldDescAndData, PVRequest
from ._functools_compat import singledispatchmethod
from ._messages import (AcknowledgeMarker, ApplicationCommand,
                        ChannelDestroyRequest, ChannelDestroyResponse,
                        ChannelFieldInfoRequest, ChannelFieldInfoResponse,
                        ChannelGetRequest, ChannelGetResponse,
                        ChannelMonitorRequest, ChannelMonitorResponse,
                        ChannelProcessRequest, ChannelProcessResponse,
                        ChannelPutGetRequest, ChannelPutGetResponse,
                        ChannelPutRequest, ChannelPutResponse,
                        ChannelRequestCancel, ChannelRequestDestroy,
                        ChannelRpcRequest, ChannelRpcResponse,
                        ConnectionValidatedResponse,
                        ConnectionValidationRequest,
                        ConnectionValidationResponse, CreateChannelRequest,
                        CreateChannelResponse, EndianSetting, Message,
                        MessageFlags, MessageHeaderBE, MessageHeaderLE,
                        MonitorSubcommand, SetByteOrder, SetMarker, Status,
                        Subcommand, _StatusOK, messages, read_from_bytestream)
from ._state import (ChannelState, CircuitState, MonitorRequestState,
                     RequestState, get_exception)
from ._utils import (CLEAR_SEGMENTS, CLIENT, DISCONNECTED, NEED_DATA, SERVER,
                     CaprotoError, CaprotoRuntimeError, ConnectionState, Role)

ChannelMessage = Union[
    CreateChannelRequest, CreateChannelResponse, ChannelFieldInfoRequest,
    ChannelFieldInfoResponse, ChannelDestroyRequest, ChannelDestroyResponse,
    ChannelGetRequest, ChannelGetResponse, ChannelMonitorRequest,
    ChannelMonitorResponse, ChannelPutRequest, ChannelPutResponse,
    ChannelProcessRequest, ChannelProcessResponse, ChannelPutGetRequest,
    ChannelPutGetResponse
]

Channel = typing.TypeVar('Channel', 'ServerChannel', 'ClientChannel')


[docs]class VirtualCircuit: """ An object encapulating the state of one CA client--server connection. It is a companion to a TCP socket managed by the user. All data received over the socket should be passed to :meth:`recv`. Any data sent over the socket should first be passed through :meth:`send`. Parameters ---------- our_role : CLIENT or SERVER address : tuple ``(host, port)`` as a string and an integer respectively priority : integer or None QOS priority """ def __init__(self, our_role, address, priority): self.log = logging.getLogger('caproto.circ') self.our_role = our_role if our_role is CLIENT: self.their_role = SERVER else: self.their_role = CLIENT self.our_address = None self.address = address self._priority = None self.channels = {} # map cid to Channel self.channels_sid = {} # map sid to Channel self.states = CircuitState(self.channels) self._data = bytearray() self._segment_state = [] self.ioids = {} # map ioid to Channel # There are only used by the convenience methods, to auto-generate ids. if our_role is CLIENT: self._channel_id_counter = ThreadsafeCounter( dont_clash_with=self.channels ) else: self._channel_id_counter = ThreadsafeCounter( dont_clash_with=self.channels_sid ) self._ioid_counter = ThreadsafeCounter(dont_clash_with=self.ioids) self._sub_counter = ThreadsafeCounter() # A fixed byte order, required by the server self.our_order = SYS_ENDIAN self.their_order = None self.fixed_recv_order = None self.messages = None if priority is not None: self.priority = priority self.cache = pva.CacheContext() @property def priority(self): return self._priority @priority.setter def priority(self, priority): if self._priority is not None: raise CaprotoRuntimeError('Cannot update priority after already set') # A server-side circuit does not get to know its priority until after # instantiation, so we need a setter. self._priority = priority @property def host(self) -> str: '''Peer host name''' return self.address[0] @property def port(self) -> int: '''Port number''' return self.address[1] def __repr__(self): return (f"<VirtualCircuit host={self.host} port={self.port} " f"our_role={self.our_role}> logger_name={self.log.name}>") def __eq__(self, other): return hash(self) == hash(other) def __hash__(self): return hash((self.address, self.our_role))
[docs] def send(self, *messages, extra: Optional[Dict] = None) -> List[Union[bytes, memoryview]]: """ Convert one or more high-level Commands into buffers of bytes that may be broadcast together in one TCP packet. Update our internal state machine. Parameters ---------- *messages : any number of :class:`Message` objects extra : dict or None Used for logging purposes. This is merged into the ``extra`` parameter passed to the logger to provide information like ``'pv'`` to the logger. Returns ------- buffers_to_send : list list of buffers to send over a socket """ buffers_to_send: List[Union[bytes, memoryview]] = [] tags = {'their_address': self.address, 'our_address': self.our_address, 'direction': '--->>>', 'role': repr(self.our_role)} tags.update(extra or {}) for message in messages: self.log.debug("Serializing %r", message, extra=tags) if self.our_role == CLIENT: role_flag = pva.MessageFlags.FROM_CLIENT else: role_flag = pva.MessageFlags.FROM_SERVER if isinstance(message, (SetByteOrder, SetMarker, AcknowledgeMarker)): # Control messages handled here message.flags = message.flags | role_flag buffers_to_send.append(memoryview(message)) else: if message._ENDIAN == pva.LITTLE_ENDIAN: header_cls = MessageHeaderLE endian_flag = pva.MessageFlags.LITTLE_ENDIAN else: header_cls = MessageHeaderBE endian_flag = pva.MessageFlags.BIG_ENDIAN payload = memoryview(message.serialize(cache=self.cache)) header = header_cls( flags=(pva.MessageFlags.APP_MESSAGE | role_flag | endian_flag), command=message.ID, payload_size=len(payload) ) message.header = header buffers_to_send.append(header) buffers_to_send.append(payload) self._process_message(message, self.our_role) # Run the circuit's state machine. self.states.process_command_type(self.our_role, message) self.states.process_command_type(self.their_role, message) return buffers_to_send
[docs] def recv(self, *buffers ) -> typing.Generator[Tuple[Union[Message, ConnectionState], Optional[int]], None, None]: """ Parse messages from buffers received over TCP. When the caller is ready to process the messages, each message should first be passed to :meth:`VirtualCircuit.process_command` to validate it against the protocol and update the VirtualCircuit's state. Parameters ---------- *buffers : any number of bytes-like buffers Yields ------ message : Message The message/message. num_bytes_needed : int Number of bytes needed for the next message. """ total_received = sum(len(byteslike) for byteslike in buffers) if total_received == 0: self.log.debug('Zero-length recv; sending disconnect notification') yield DISCONNECTED, None return self.log.debug("Received %d bytes.", total_received) self._data += b''.join(buffers) while True: decoded, num_bytes_needed, segmented = read_from_bytestream( self._data, self.their_role, segment_state=self._segment_state, byte_order=self.fixed_recv_order, cache=self.cache) len_data = len(self._data) message, self._data, bytes_consumed = decoded if isinstance(self._data, memoryview): self._data = bytearray(self._data) if segmented is CLEAR_SEGMENTS: self._segment_state.clear() elif segmented is not None: self._segment_state.append(segmented) continue if message is NEED_DATA: self.log.debug( "%d bytes are cached. Need %d more bytes to parse next " "message.", len_data, num_bytes_needed) break self.log.debug("%d bytes -> %r", bytes_consumed, message) yield message, None
[docs] def process_command(self, message: Message): """ Update internal state machine and raise if protocol is violated. **Received** messages should be passed through here before any additional processing by a server or client layer. """ if message is DISCONNECTED: self.states.disconnect() return self._process_message(message, self.their_role) status = getattr(message, 'status', None) if status is not None and (status.message or status.call_tree): self.log.debug('Command %s status: %s', type(message).__name__, status) # Run the circuit's state machine. self.states.process_command_type(self.our_role, message) self.states.process_command_type(self.their_role, message)
def _get_channel_from_message(self, message: ChannelMessage) -> Channel: """ Get the :class:`Channel` instance associated with the given message. Parameters ---------- message : Message """ cid = getattr(message, 'client_chid', None) if cid is not None: return self.channels[cid] sid = getattr(message, 'server_chid', None) if sid is not None: return self.channels_sid[sid] ioid = getattr(message, 'ioid', None) if ioid is not None: return self.ioids[ioid]['channel'] raise get_exception(self.our_role, message)( 'Unable to determine channel information' ) @singledispatchmethod def _process_message(self, message: Message, role: Role): """ Single dispatch method for all messages. Parameters ---------- message : Message The message to process. role : ``CLIENT`` or ``SERVER`` The role of the sender. """ # Filter for Commands that are pertinent to a specific Channel, as # opposed to the Circuit as a whole: if any((hasattr(message, 'server_chid'), hasattr(message, 'client_chid'), hasattr(message, 'ioid'))): raise RuntimeError(f'Channel-specific message fell through: ' f'TODO {message}') self.log.warning('Unhandled message in circuit: %s (%s)', message, role) @_process_message.register def _(self, message: SetByteOrder, role: Role): fixed = (message.byte_order_setting == EndianSetting.use_server_byte_order) if self.our_role == SERVER: self.our_order = message.byte_order self.their_order = None self.fixed_recv_order = None self.messages = messages[(self.our_order, MessageFlags.FROM_SERVER)] else: self.our_order = message.byte_order self.their_order = message.byte_order self.fixed_recv_order = (self.their_order if fixed else None) if fixed: self.log.debug('Using fixed byte order for server messages' ': %s', self.fixed_recv_order.name) else: self.log.debug('Using byte order from individual messages.') self.messages = messages[(self.our_order, MessageFlags.FROM_CLIENT)] @_process_message.register(ConnectionValidationRequest) @_process_message.register(ConnectionValidationResponse) @_process_message.register(ConnectionValidatedResponse) def _(self, message, role: Role): ... @_process_message.register def _process_channel_creation(self, message: CreateChannelRequest, role: Role): """ Separately process channel creation, as it may possibly work on more than one channel at a time. """ channels = [] for request in message.channels: cid = request['id'] try: chan = self.channels[cid] except KeyError: chan = self.create_channel(request['channel_name'], cid=cid) channels.append((request, chan)) for request, channel in channels: # transitions = channel.process_command(message) channel.process_command(message, role) self.channels[request['id']] = channel # for transition in transitions: # chan.state_changed(*transition) @_process_message.register(ChannelDestroyRequest) @_process_message.register(ChannelDestroyResponse) @_process_message.register(ChannelFieldInfoRequest) @_process_message.register(ChannelFieldInfoResponse) @_process_message.register(ChannelGetRequest) @_process_message.register(ChannelGetResponse) @_process_message.register(ChannelRpcRequest) @_process_message.register(ChannelRpcResponse) @_process_message.register(ChannelMonitorRequest) @_process_message.register(ChannelMonitorResponse) @_process_message.register(ChannelPutRequest) @_process_message.register(ChannelPutResponse) @_process_message.register(ChannelRequestCancel) @_process_message.register(ChannelRequestDestroy) @_process_message.register(CreateChannelResponse) def _(self, message, role: Role): channel = self._get_channel_from_message(message) channel.process_command(message, role)
[docs] def disconnect(self): """ Notify all channels on this circuit that they are disconnected. Clients should call this method when a TCP connection is lost. """ return DISCONNECTED
[docs] def new_channel_id(self): "Return a valid value for a cid or sid." # Return the next sequential unused id. Wrap back to 0 on overflow. return self._channel_id_counter()
[docs] def new_subscriptionid(self): """ This is used by the convenience methods to obtain an unused integer ID. It does not update any important state. """ # Return the next sequential unused id. Wrap back to 0 on overflow. return self._sub_counter()
[docs] def new_ioid(self): """ This is used by the convenience methods to obtain an unused integer ID. It does not update any important state. """ # Return the next sequential unused id. Wrap back to 0 on overflow. return self._ioid_counter()
[docs] def set_byte_order(self, endian_setting: EndianSetting) -> SetByteOrder: """ Generate a valid :class:`SetByteOrder`. Returns ------- SetByteOrder """ return SetByteOrder( endian_setting, flags=pva.MessageFlags.CONTROL_MESSAGE, )
[docs] def acknowledge_marker(self) -> AcknowledgeMarker: """ Generate a valid :class:`AcknowledgeMarker`. Returns ------- AcknowledgeMarker """ return AcknowledgeMarker( flags=pva.MessageFlags.APP_MESSAGE, message=AcknowledgeMarker.ID, payload_size=0, )
[docs] def create_channel(self, name: str, cid: int = None) -> Channel: """ Create a ClientChannel or ServerChannel, depending on the role. Parameters ---------- name : str The channel name. cid : int The client channel ID. """ cls = {CLIENT: ClientChannel, SERVER: ServerChannel}[self.our_role] return cls(name, self, cid=cid)
[docs]class ClientVirtualCircuit(VirtualCircuit):
[docs] def validate_connection(self, buffer_size: int, registry_size: int, connection_qos: int, auth_nz: str = 'ca', data=None, ) -> ConnectionValidationResponse: """ Generate a valid :class:`_ConnectionValidationResponse`. Parameters ---------- buffer_size : int Client buffer size. registry_size : int Client registry size. connection_qos : int Connection QOS value. auth_nz : str, optional Authorization string, defaults to 'ca'. Caller must confirm that the server supports the given authorization method prior to specifying it. Returns ------- ConnectionValidationResponse """ cls = self.messages[ApplicationCommand.CONNECTION_VALIDATION] data = FieldDescAndData(data=data) return cls(client_buffer_size=buffer_size, client_registry_size=registry_size, connection_qos=connection_qos, auth_nz=auth_nz, data=data, )
[docs]class ServerVirtualCircuit(VirtualCircuit): def __init__(self, our_role, address, priority): super().__init__(our_role, address, priority) # TODO rethink? self.messages = messages[(self.our_order, MessageFlags.FROM_SERVER)]
[docs] def validate_connection(self, buffer_size: int, registry_size: int, authorization_options: Sequence, ) -> ConnectionValidationRequest: """ Generate a valid :class:`_ConnectionValidationRequest`. Parameters ---------- buffer_size : int Server buffer size. registry_size : int Server registry size. authorization_options : Sequence Authorization options, such as 'ca' or 'anonymous'. Returns ------- ConnectionValidationRequest """ cls = self.messages[ApplicationCommand.CONNECTION_VALIDATION] return cls(server_buffer_size=buffer_size, server_registry_size=registry_size, auth_nz=authorization_options, )
[docs] def validated_connection(self) -> ConnectionValidatedResponse: """Generate a valid :class:`_ConnectionValidatedResponse`.""" cls = self.messages[ApplicationCommand.CONNECTION_VALIDATED] return cls()
class _BaseChannel: # Base class for ClientChannel and ServerChannel, which add convenience # methods for composing requests and responses, respectively. def __init__(self, name, circuit, cid=None): tags = {'pv': name, 'their_address': circuit.address, 'role': repr(circuit.our_role)} self.log = ComposableLogAdapter(logging.getLogger('caproto.ch'), tags) self.endian = None # TODO self.name = name self.circuit = circuit if cid is None: cid = self.circuit.new_channel_id() self.cid = cid self.circuit.channels[self.cid] = self self.states = ChannelState(self.circuit.states) # These will be set when the circuit processes CreateChannelResponse.. self.interface = None self.sid = None self.ioids = {} # self.access_rights = None @property def subscriptions(self): """ Get cached EventAdd messages for this channel's active subscriptions. """ def state_changed(self, role, old_state, new_state, message): '''State changed callback for subclass usage''' pass def process_command(self, message: ChannelMessage, role: Role): # Update the state machine of the pertinent Channel. If this is # not a valid message, the state machine will raise here. Stash the # state transitions in a local var, run the callbacks at the end. ioid_info = self._ioid_info_from_message(message) subcommand = getattr(message, 'subcommand', None) if subcommand is not None and ioid_info is not None: ioid_state = ioid_info['state'] ioid_state.process_subcommand(subcommand) self._process_message(message, role, ioid_info) if (role == Role.SERVER and subcommand and Subcommand.DESTROY in Subcommand(subcommand)): # Only destroy when the server acknowledges it self._destroy_ioid(ioid_info) transitions = [] for role in (self.circuit.our_role, self.circuit.their_role): initial_state = self.states[role] try: self.states.process_command_type(role, message) except CaprotoError as ex: ex.channel = self raise new_state = self.states[role] # Assemble arguments needed by state_changed, to be called later. if initial_state is not new_state: transition = (role, initial_state, new_state, message) transitions.append(transition) # We are done. Run the Channel state change callbacks. for transition in transitions: self.state_changed(*transition) return transitions @singledispatchmethod def _process_message(self, message, role: Role, ioid_info: dict): self.circuit.log.warning( 'Unhandled channel message %s role=%s ioid_info=%s', message, role, ioid_info ) @_process_message.register def _(self, message: CreateChannelRequest, role: Role, ioid_info: dict): ... @_process_message.register def _(self, message: ChannelFieldInfoResponse, role: Role, ioid_info: dict): field_info = getattr(message.field_if, '_pva_struct_', message.field_if) for line in field_info.summary().splitlines(): self.circuit.log.debug('[%s] %s', self.name, line) def _ioid_info_from_message(self, message: ChannelMessage) -> Optional[Dict]: ioid = getattr(message, 'ioid', None) if ioid is None: return try: return self.ioids[ioid] except KeyError: ... if isinstance(message, (ChannelMonitorRequest, ChannelMonitorResponse)): state_class = MonitorRequestState else: state_class = RequestState ioid_info = dict( channel=self, state=state_class(message.__class__.__name__), ioid=ioid, ) self.ioids[ioid] = ioid_info self.circuit.ioids[ioid] = ioid_info return ioid_info @_process_message.register(ChannelDestroyRequest) @_process_message.register(ChannelDestroyResponse) @_process_message.register(ChannelFieldInfoRequest) @_process_message.register(ChannelFieldInfoResponse) @_process_message.register(ChannelGetRequest) @_process_message.register(ChannelRpcRequest) @_process_message.register(ChannelRpcResponse) @_process_message.register(ChannelMonitorRequest) @_process_message.register(ChannelPutRequest) def _(self, message, role: Role, ioid_info: dict): ... # Nothing to do here @_process_message.register def _(self, message: CreateChannelResponse, role: Role, ioid_info: dict): self.sid = message.server_chid self.circuit.channels_sid[message.server_chid] = self @_process_message.register def _(self, message: ChannelPutResponse, role: Role, ioid_info: dict): if message.status.is_successful: subcommand = Subcommand(message.subcommand) if Subcommand.INIT in subcommand: interface = message.put_structure_if self.circuit.cache.ioid_interfaces[message.ioid] = interface @_process_message.register def _(self, message: ChannelGetResponse, role: Role, ioid_info: dict): if message.status.is_successful: subcommand = Subcommand(message.subcommand) if Subcommand.INIT in subcommand: interface = message.pv_structure_if self.circuit.cache.ioid_interfaces[message.ioid] = interface @_process_message.register def _(self, message: ChannelMonitorResponse, role: Role, ioid_info: dict): subcommand = MonitorSubcommand(message.subcommand) if MonitorSubcommand.INIT in subcommand: if message.status.is_successful: interface = message.pv_structure_if self.circuit.cache.ioid_interfaces[message.ioid] = interface @_process_message.register def _(self, message: ChannelRequestCancel, role: Role, ioid_info: dict): ... # TODO? @_process_message.register def _(self, message: ChannelRequestDestroy, role: Role, ioid_info: dict): self._destroy_ioid(ioid_info) def _destroy_ioid(self, ioid_info: dict): """Destroy an I/O request id given its info dictionary.""" ioid = ioid_info['ioid'] self.ioids.pop(ioid) # May not have been successful: self.circuit.cache.ioid_interfaces.pop(ioid, None) class ClientChannel(_BaseChannel): """An object encapsulating the state of the EPICS Channel on a Client. A ClientChannel may be created in one of two ways: (1) The user instantiates a ClientChannel with a name, server address, and optional cid. The server address is used to assign the ClientChannel to a VirtualCircuit. If no cid is given, a unique one is allocated by the VirtualCircuit. (2) A VirtualCircuit processes a CreateChannelRequest that refers to a cid that it has not yet seen. A ClientChannel will be automatically instantiated with the name and cid indicated that message and the address of that VirtualCircuit. It can be accessed in the circuit's ``channels`` attribute. Parameters ---------- name : string Channnel name (PV) circuit : VirtualCircuit cid : integer, optional """ def create(self) -> CreateChannelRequest: """ Generate a valid :class:`CreateChannelRequest`. Returns ------- CreateChannelRequest """ create_cls = self.circuit.messages[ApplicationCommand.CREATE_CHANNEL] return create_cls( count=1, channels=[{'id': self.cid, 'channel_name': self.name}] ) def disconnect(self) -> ChannelDestroyRequest: """ Generate a valid :class:`ChannelDestroyRequest`. Returns ------- ChannelDestroyRequest """ if self.sid is None: raise RuntimeError('Server ID unavailable') cls = self.circuit.messages[ApplicationCommand.DESTROY_CHANNEL] return cls(client_chid=self.cid, server_chid=self.sid) def read_interface(self, *, ioid=None, sub_field_name='') -> ChannelFieldInfoRequest: """ Generate a valid :class:`ChannelFieldInfoRequest`. """ if self.sid is None: raise RuntimeError('Server ID unavailable') if ioid is None: ioid = self.circuit.new_ioid() cls = self.circuit.messages[ApplicationCommand.GET_FIELD] return cls(server_chid=self.sid, ioid=ioid, sub_field_name=sub_field_name, ) def read(self, *, ioid=None, pvrequest: str = 'field()') -> ChannelGetRequest: """ Generate a valid :class:`ChannelGetRequest`. Parameters ---------- ioid pvrequest Returns ------- ChannelGetRequest """ if ioid is None: ioid = self.circuit.new_ioid() cls = typing.cast(Type[ChannelGetRequest], self.circuit.messages[ApplicationCommand.GET]) if not isinstance(pvrequest, PVRequest): pvrequest = PVRequest(data=pvrequest) instance = cls(server_chid=self.sid, ioid=ioid) instance.to_init(pv_request=pvrequest) return instance def rpc(self, *, ioid=None, pvrequest: str = 'field()') -> ChannelRpcRequest: """ Generate a valid :class:`ChannelRpcRequest`. Parameters ---------- ioid pvrequest Returns ------- ChannelRpcRequest """ if ioid is None: ioid = self.circuit.new_ioid() if not isinstance(pvrequest, PVRequest): pvrequest = PVRequest(data=pvrequest) cls: Type[ChannelRpcRequest] = self.circuit.messages[ApplicationCommand.RPC] instance = cls(server_chid=self.sid, ioid=ioid) instance.to_init(pv_request=pvrequest) return instance def subscribe(self, *, ioid=None, pvrequest: str = 'field(value)', queue_size=None) -> ChannelMonitorRequest: """ Generate a valid :class:`ChannelMonitorRequest`. Parameters ---------- ioid pvrequest Returns ------- ChannelMonitorRequest """ if ioid is None: ioid = self.circuit.new_ioid() if not isinstance(pvrequest, PVRequest): pvrequest = PVRequest(data=pvrequest) cls: Type[ChannelMonitorRequest] = self.circuit.messages[ApplicationCommand.MONITOR] return cls(server_chid=self.sid, ioid=ioid).to_init( pv_request=pvrequest, queue_size=queue_size or 0, ) def write(self, *, ioid: int = None, pvrequest: str = 'field(value)', queue_size=None) -> ChannelPutRequest: """ Generate a valid :class:`ChannelPutRequest` (INIT). Parameters ---------- ioid : int, optional The I/O identifier. Generated if needed. """ if ioid is None: ioid = self.circuit.new_ioid() if not isinstance(pvrequest, PVRequest): pvrequest = PVRequest(data=pvrequest) cls = self.circuit.messages[ApplicationCommand.PUT] instance: ChannelPutRequest = cls(server_chid=self.sid, ioid=ioid) return instance.to_init(pv_request=pvrequest) def cancel(self, ioid: int) -> ChannelRequestCancel: """ Generate a valid :class:`ChannelRequestCancel`. Parameters ---------- ioid : int, optional The I/O identifier. """ cls = self.circuit.messages[ApplicationCommand.CANCEL_REQUEST] instance: ChannelRequestCancel = cls(server_chid=self.sid, ioid=ioid) return instance class ServerChannel(_BaseChannel): """ A server-side Channel. """ def create(self, sid: int, *, status=_StatusOK, ) -> CreateChannelResponse: """ Generate a valid :class:`CreateChannelResponse`. Parameters ---------- sid : int The server channel ID. status : Status Status information. Returns ------- CreateChannelResponse """ create_cls = self.circuit.messages[ApplicationCommand.CREATE_CHANNEL] return create_cls( client_chid=self.cid, server_chid=sid, status=status, ) def read_interface(self, ioid, interface, *, status=_StatusOK ) -> ChannelFieldInfoResponse: """ Generate a valid :class:`ChannelFieldInfoResponse`. """ cls = self.circuit.messages[ApplicationCommand.GET_FIELD] return cls(ioid=ioid, status=status, interface=interface, ) def read(self, ioid, interface, *, status: Status = _StatusOK, ) -> ChannelGetResponse: """ Generate a valid :class:`ChannelGetResponse`. Parameters ---------- ioid : int The operation ID. interface : FieldDesc or dataclass The interface or dataclass containing data. status : Status Status information. Returns ------- ChannelGetResponse """ cls = self.circuit.messages[ApplicationCommand.GET] return cls(ioid=ioid, status=status).to_init( pv_structure_if=interface ) def write(self, ioid, put_structure_if, *, status: Status = _StatusOK) -> ChannelPutResponse: """ Generate a valid :class:`ChannelPutResponse`. Parameters ---------- ioid : int The operation ID. put_structure_if : FieldDesc or dataclass The interface or dataclass containing data. status : Status Status information. Returns ------- ChannelPutResponse """ cls = typing.cast(Type[ChannelPutResponse], self.circuit.messages[ApplicationCommand.PUT]) return cls(ioid=ioid, status=status).to_init( put_structure_if=put_structure_if ) def disconnect(self) -> ChannelDestroyResponse: """ Generate a valid :class:`ChannelDestroyResponse`. Returns ------- ChannelDestroyResponse """ cls = self.circuit.messages[ApplicationCommand.DESTROY_CHANNEL] return cls(client_chid=self.cid, server_chid=self.sid) def subscribe(self, ioid, interface, *, status=_StatusOK, ) -> ChannelMonitorResponse: """ Generate a valid :class:`ChannelMonitorResponse`. Parameters ---------- ioid : int The operation ID. status : Status Status information. interface : FieldDesc or dataclass The interface or dataclass containing data. Returns ------- ChannelMonitorResponse """ cls: Type[ChannelMonitorResponse] = self.circuit.messages[ApplicationCommand.MONITOR] return cls(ioid=ioid).to_init(status=status, pv_structure_if=interface) def rpc(self, ioid, *, status: Status = _StatusOK) -> ChannelRpcResponse: """ Generate a valid :class:`ChannelRpcResponse`. Parameters ---------- ioid : int The operation ID. status : Status Status information. Returns ------- ChannelRpcResponse """ cls: Type[ChannelRpcResponse] = self.circuit.messages[ApplicationCommand.RPC] return cls(ioid=ioid).to_init(status=status)