# This module contains high-level "command" objects, one for each CA command.
# A command wraps together a ``MessageHeader`` from _headers.py with a payload
# of raw bytes (if applicable). It provides a more user-friendly __init__ that
# accepts standard Python types and handles details like type conversion and
# bit-padding. For every argument to the __init__ there is a corresponding
# property to allow high-level introspection of a command. There are also
# ``header`` and ``payload`` attributes for lower-level introspection. Finally,
# the command objects support ``__bytes__``, encoding them for sending over the
# wire.
# A command class may be instantiated in one of two ways:
# 1. For sending: by passing user-friendly inputs to ``__init__``.
# 2. For receiving: by passing a datagram or bytestream to the functions
# ``read_datagram`` and `read_from _bytestream`` respectively. These
# identify the command type and instantiate the appropriate class from bytes
# using ``__new__``.
#
# (1) is typically done by the user. (2) is typically done by calling the
# ``next_command`` method of a :class:`Broadcaster` or a
# :class:`VirtualCircuit`.
import array
import ctypes
import inspect
import os
import socket
import struct
import warnings
from collections.abc import Iterable
import _ctypes
from . import _dbr as dbr
from ._backend import backend
from ._constants import DO_REPLY, MAX_RECORD_LENGTH, NO_REPLY
from ._dbr import (DBR_INT, DBR_TYPES, MAX_STRING_SIZE, AccessRights,
ChannelType, float_t, native_type, short_t, special_types,
ushort_t)
from ._headers import (AccessRightsResponseHeader, BeaconHeader,
ClearChannelRequestHeader, ClearChannelResponseHeader,
ClientNameRequestHeader, CreateChanRequestHeader,
CreateChanResponseHeader, CreateChFailResponseHeader,
EchoRequestHeader, EchoResponseHeader,
ErrorResponseHeader, EventAddRequestHeader,
EventAddResponseHeader, EventCancelRequestHeader,
EventCancelResponseHeader, EventsOffRequestHeader,
EventsOnRequestHeader, ExtendedMessageHeader,
HostNameRequestHeader, MessageHeader,
NotFoundResponseHeader, ReadNotifyRequestHeader,
ReadNotifyResponseHeader, ReadRequestHeader,
ReadResponseHeader, ReadSyncRequestHeader,
RepeaterConfirmResponseHeader,
RepeaterRegisterRequestHeader, SearchRequestHeader,
SearchResponseHeader, ServerDisconnResponseHeader,
VersionRequestHeader, VersionResponseHeader,
WriteNotifyRequestHeader, WriteNotifyResponseHeader,
WriteRequestHeader)
from ._status import eca_value_to_status, ensure_eca_value
from ._utils import (CLIENT, NEED_DATA, REQUEST, RESPONSE, SERVER,
CaprotoNotImplementedError, CaprotoTypeError,
CaprotoValueError, RemoteProtocolError, ValidationError,
ensure_bytes)
__all__ = ('AccessRightsResponse', 'ClearChannelRequest',
'ClearChannelResponse', 'ClientNameRequest',
'CreateChFailResponse', 'CreateChanRequest',
'CreateChanResponse', 'EchoRequest',
'EchoResponse', 'ErrorResponse',
'EventAddRequest', 'EventAddRequestPayload', 'EventAddResponse',
'EventCancelRequest', 'EventCancelResponse',
'EventsOffRequest', 'EventsOnRequest',
'get_command_class', 'HostNameRequest',
'ipv4_from_int32', 'ipv4_to_int32',
'NotFoundResponse',
'ReadNotifyRequest', 'ReadNotifyResponse',
'ReadRequest', 'ReadResponse',
'ReadSyncRequest',
'RepeaterConfirmResponse',
'RepeaterRegisterRequest', 'Beacon',
'SearchRequest', 'SearchResponse',
'ServerDisconnResponse', 'VersionRequest',
'VersionResponse', 'WriteNotifyRequest',
'WriteNotifyResponse', 'WriteRequest',
'Message')
_MessageHeaderSize = ctypes.sizeof(MessageHeader)
_ExtendedMessageHeaderSize = ctypes.sizeof(ExtendedMessageHeader)
_pad_buffer = {mod_sz: b'\0' * (8 - mod_sz)
for mod_sz in range(1, 8)}
_pad_buffer[0] = b''
STR_ENC = os.environ.get('CAPROTO_STRING_ENCODING', 'latin-1')
def ipv4_to_int32(ip: str) -> int:
'''Pack an IPv4 into a 32-bit integer (in network byte order)'''
encoded_ip = socket.inet_pton(socket.AF_INET, ip)
return struct.unpack('!I', encoded_ip)[0]
def ipv4_from_int32(int_packed_ip: int) -> str:
'''Unpack an IPv4 from a 32-bit integer (in network byte order)'''
encoded_ip = struct.pack('!I', int_packed_ip)
return socket.inet_ntop(socket.AF_INET, encoded_ip)
def has_metadata(data_type):
'Does data_type have associated metadata?'
return (data_type in special_types or
data_type != native_type(data_type))
def from_buffer(data_type, data_count, buffer):
"Wraps dbr_type.from_buffer and special-case strings."
payload_size = data_count * ctypes.sizeof(
DBR_TYPES[native_type(data_type)])
if has_metadata(data_type):
md_payload = DBR_TYPES[data_type].from_buffer(buffer)
md_size = ctypes.sizeof(DBR_TYPES[data_type])
else:
md_payload = b''
md_size = 0
# Use payload_size to strip off any right-padding that may have been added
# to make the byte-size of the payload a multiple of 8.
data_payload = memoryview(buffer)[md_size:md_size + payload_size]
return md_payload, data_payload
def padded_len(s):
"Length of a (byte)string rounded up to the nearest multiple of 8."
return 8 * ((len(s) + 7) // 8)
def pad_buffers(*buffers):
'''Get a bytestring for padding a concatenated set of buffers
Parameters
----------
*buffers : supported buffer type
Returns
-------
full_padded_length, pad_buffer
'''
unpadded_size = sum(bytelen(buf) for buf in buffers)
pad_buffer = _pad_buffer[unpadded_size % 8]
return unpadded_size + len(pad_buffer), pad_buffer
def padded_string_payload(payload):
byte_payload = ensure_bytes(payload)
padded_size = padded_len(byte_payload)
return padded_size, byte_payload.ljust(padded_size, b'\x00')
def bytelen(item):
"""
Meaasure the byte length of an item.
Supports:
- ``array.array`` (from the builtin Python lib)
- ``ctypes`` objects
- an object that has an ``nbytes`` attribute (notably, numpy arrays and
``memoryview``)
- ``bytes``
- ``bytearray``
"""
if isinstance(item, array.array):
return item.itemsize * len(item)
elif isinstance(item, (ctypes.Structure, _ctypes._SimpleCData)):
return ctypes.sizeof(item)
elif hasattr(item, 'nbytes'):
# Duck-type as numpy array / memoryview.
return item.nbytes
elif isinstance(item, (bytes, bytearray)):
return len(item)
else:
# We could just fall back on len() but I worry that someone will
# unwittingly use this on a type that has a __len__ that is not its
# bytelength and is not already caught above. Better to fail like this.
raise CaprotoNotImplementedError("Not sure how to measure byte length "
"of object of type {}"
"".format(type(item)))
def parse_metadata(metadata, data_type):
"""
Parse metadata tuple into bytes or DBR.
If input is:
* tuple -> DBR struct
* DBR struct or bytes -> no-op
* None -> empty bytes object
Parameters
----------
metadata : a DBR struct, any iterable, or bytes
data_type : integer
Returns
-------
md_payload : a DBR struct or bytes
"""
if hasattr(metadata, 'DBR_ID'):
# This is already a DBR.
md_payload = metadata
elif isinstance(metadata, bytes):
md_payload = metadata
elif metadata is None:
md_payload = b''
elif isinstance(metadata, Iterable):
# This is a tuple of values to be encoded into a DBR.
justified_md = []
for val in metadata:
if isinstance(val, str):
if len(val) > MAX_STRING_SIZE: # 39?
raise CaprotoValueError("The protocol limits strings to "
"40 characters.")
val = val.ljust(MAX_STRING_SIZE, b'\x00')
justified_md.append(val)
md_payload = DBR_TYPES[data_type](*justified_md)
else:
raise CaprotoTypeError("metadata given as type we cannot handle - {}"
"".format(type(metadata)))
return md_payload
def data_payload(data, metadata, data_type, data_count):
"""
Pack bytes into a set of buffers for usage as a single payload
Parameters
----------
data : ``array.array``, ``numpy.ndarray``, any iterable, or bytes
If input is bytes or ``array.array``, we assume that the byte order of
the input is big-endian. (We have no means of checking.) If the input
is ``numpy.ndarray`` or any other iterable, we ensure big-endianness.
metadata : a DBR struct, any iterable, or bytes
data_type : ChannelType or integer
data_count : integer
Returns
-------
size, md_payload, data_payload[, pad_payload]
pad_payload will only be returned if needed
"""
data_type = ChannelType(data_type)
# Make the data payload.
if isinstance(data, bytes):
# Assume bytes are big-endian; we have no way of checking.
data_payload = data
elif (isinstance(data, backend.array_types) or
isinstance(data, Iterable)):
data_payload = backend.python_to_epics(
native_type(data_type), data, byteswap=True)
elif data is None:
data_payload = b''
else:
raise CaprotoTypeError("data given as type we cannot handle - {}"
"".format(type(data)))
md_payload = parse_metadata(metadata, data_type)
size, pad_payload = pad_buffers(md_payload, data_payload)
if pad_payload:
return size, md_payload, data_payload, pad_payload
return size, md_payload, data_payload
def extract_data(buffer, data_type, data_count):
"Return a scalar or big-endian array (numpy.ndarray or array.array)."
data = backend.epics_to_python(buffer, native_type(data_type), data_count,
auto_byteswap=True)
if data_count < len(data):
return data[:data_count] # (no copy)
return data
def extract_metadata(payload, data_type):
"Return one of the classes in _data.py."
if data_type < 7:
return None
payload = bytearray(payload) # Makes a copy -- maybe not necessary?
return dbr.DBR_TYPES[data_type].from_buffer(payload)
def get_command_class(role, header):
return Commands[role][header.command]
def read_datagram(data, address, role):
"Parse bytes from one datagram into one or more commands."
barray = bytearray(data)
commands = []
while barray:
header = MessageHeader.from_buffer(barray)
barray = barray[_MessageHeaderSize:]
try:
_class = Commands[role][header.command]
except KeyError:
raise RemoteProtocolError(
f"Packet with bad command ID {hex(header.command)} was "
f"received. Header: {header}"
) from None
payload_size = header.payload_size
if _class.HAS_PAYLOAD:
payload_bytes = barray[:header.payload_size]
barray = barray[payload_size:]
else:
payload_bytes = None
command = _class.from_wire(header, payload_bytes,
sender_address=address)
commands.append(command)
return commands
def bytes_needed_for_command(data, role):
'''
Parameters
----------
data
role
Returns
-------
(header, num_bytes_needed)
'''
header_size = _MessageHeaderSize
data_len = len(data)
# We need at least one header's worth of bytes to interpret anything.
if data_len < header_size:
return None, header_size - data_len
header = MessageHeader.from_buffer(data)
# Looks for sentinels that mark this as an "extended header".
if header.payload_size == 0xFFFF and header.data_count == 0:
header_size = _ExtendedMessageHeaderSize
# Do we have enough bytes to interpret the extended header?
if data_len < header_size:
return None, header_size - data_len
header = ExtendedMessageHeader.from_buffer(data)
total_size = header_size + header.payload_size
# Do we have all the bytes in the payload?
if data_len < total_size:
return header, total_size - data_len
return header, 0
def read_from_bytestream(data, role):
'''
Parameters
----------
data
role
Returns
-------
(remaining_data, command, num_bytes_needed)
if more data is required, NEED_DATA will be returned in place of
`command`
'''
header, num_bytes_needed = bytes_needed_for_command(data, role)
if num_bytes_needed > 0:
return data, NEED_DATA, num_bytes_needed
try:
_class = Commands[role][header.command]
except KeyError:
raise RemoteProtocolError(
f"Packet with bad command ID {hex(header.command)} was received."
f"\nThis may be a non-Channel Access client such as a security "
f"scanner attempting to probe a server, a malfunctioning client, "
f"or an unsupported client. If the above does not apply, please "
f"open an issue: https://github.com/caproto/caproto/issues"
f"\nHeader details: {header}."
) from None
header_size = ctypes.sizeof(header)
total_size = header_size + header.payload_size
# Receive the buffer (zero-copy).
payload_bytes = memoryview(data)[header_size:total_size]
command = _class.from_wire(header, payload_bytes)
# Advance the buffer.
return data[total_size:], command, 0
Commands = {}
Commands[CLIENT] = {}
Commands[SERVER] = {}
_commands = set()
class Message:
__slots__ = ('header', 'buffers', 'sender_address')
ID = None # integer, to be overriden by subclass
def __init_subclass__(cls, register=True):
if register:
# Add this class to the global registries of commands.
name = cls.__name__
if name.endswith('Request'):
direction = REQUEST
command_dict = Commands[CLIENT]
else:
direction = RESPONSE
command_dict = Commands[SERVER]
cls.DIRECTION = direction
if cls.ID is not None:
command_dict[cls.ID] = cls
if name in ('Beacon, '):
Commands[CLIENT][cls.ID] = cls
Commands[SERVER][cls.ID] = cls
_commands.add(cls)
def __init__(self, header, *buffers, validate=True, sender_address=None):
self.header = header
self.buffers = buffers
self.sender_address = sender_address
if validate:
self.validate()
def validate(self):
size = sum(bytelen(buf) for buf in self.buffers)
if self.buffers == () and self.header.payload_size != 0:
raise ValidationError(
"{}.header.payload_size {} > 0 but payload is None."
"".format(type(self).__name__, self.header.payload_size))
elif self.header.payload_size != size:
raise ValidationError(
"{}.header.payload_size {} != payload size of {}"
"".format(type(self).__name__, self.header.payload_size, size))
if self.header.command != self.ID:
raise ValidationError(
"A {} must have a header with header.command == {}, not {}."
"".format(type(self).__name__, self.ID, self.header.command))
@classmethod
def from_wire(cls, header, payload_bytes, *, sender_address=None,
validate=False):
"""
Use header.dbr_type to pack payload bytes into the right structure.
Some Command types allocate a different meaning to the header.dbr_type
field, and these override this method in their subclass.
We do *not* validate by default, both for performance and for
forward-compability. But validation may be useful to turn on in the
context of consuming network traffic and trying to identify CA packets
(e.g. caproto.sync.shark).
"""
if not cls.HAS_PAYLOAD:
return cls.from_components(header)
payload = from_buffer(header.data_type, header.data_count,
payload_bytes)
return cls.from_components(header, *payload,
sender_address=sender_address,
validate=validate)
@classmethod
def from_components(cls, header, *buffers, sender_address=None,
validate=False):
# Bwahahahaha
instance = cls.__new__(cls)
instance.header = header
instance.buffers = buffers
instance.sender_address = sender_address
if validate:
instance.validate()
return instance
def __eq__(self, other):
return bytes(self) == bytes(other)
def __hash__(self):
return hash(bytes(self))
def __ne__(self, other):
return bytes(self) != bytes(other)
def __bytes__(self):
# In general it's better to use self.buffers over bytes(self) because
# The former does not copy large continuous memory arrays.
raw_bytes = bytearray()
# Concatenate buffers -- this copies data!
for buf in self.buffers:
raw_bytes += bytes(buf)
# Trim 40-char string struct to payload_size.
trimmed_bytes = raw_bytes[:self.header.payload_size]
# Pad to multiple of 8.
payload_bytes = trimmed_bytes.ljust(padded_len(trimmed_bytes), b'\x00')
return bytes(self.header) + bytes(payload_bytes)
def __repr__(self):
signature = inspect.signature(type(self))
parameters = (signature.parameters if type(self) is not Message
else ['header'])
def safe_repr(arg):
try:
return repr(getattr(self, arg))
except Exception as ex:
return f'(repr: {ex})'
d = [(arg, safe_repr(arg)) for arg in parameters]
formatted_args = ", ".join(["{!s}={}".format(k, v)
for k, v in d])
return "{}({})".format(type(self).__name__, formatted_args)
def __len__(self):
return (ctypes.sizeof(self.header) +
sum(bytelen(buf) for buf in self.buffers))
@property
def nbytes(self):
return len(self)
[docs]class VersionRequest(Message):
"""
Initiate a new connection or broadcast between the client and the server.
Fields:
.. attribute:: priority
Between 0 (low) and 99 (high) designating this connection's priority
in the event of congestion.
.. attribute:: version
The version of the Channel Access protocol.
"""
__slots__ = ()
ID = 0
HAS_PAYLOAD = False
def __init__(self, priority, version):
header = VersionRequestHeader(priority, version)
super().__init__(header)
priority = property(lambda self: self.header.data_type)
version = property(lambda self: self.header.data_count)
def validate(self):
if not (0 <= self.priority < 100):
raise ValidationError("Expecting 0 < priority < 100")
[docs]class VersionResponse(Message):
"""
Respond to a client's initiation of a new connection or broadcast.
Fields:
.. attribute:: version
The version of the Channel Access protocol.
"""
__slots__ = ()
ID = 0
HAS_PAYLOAD = False
def __init__(self, version):
header = VersionResponseHeader(version)
super().__init__(header)
version = property(lambda self: self.header.data_count)
[docs]class SearchRequest(Message):
"""
Query for the address of the server that provides a given Channel.
Fields:
.. attribute:: name
String name of the channel (i.e. 'PV')
.. attribute:: cid
Integer that uniquely identifies this search query on the client side.
.. attribute:: version
The version of the Channel Access protocol.
.. attribute:: payload_size
Padded length of name string
.. attribute:: reply
Hard-coded to :data:`NO_REPLY` (:data:`5`) meaning that only the
server(s) with an affirmative response should reply.
"""
__slots__ = ()
ID = 6
HAS_PAYLOAD = True
def __init__(self, name, cid, version, reply=NO_REPLY):
size, payload = padded_string_payload(name)
rec, _, field = name.partition('.')
_len = len(rec)
if _len > MAX_RECORD_LENGTH:
raise CaprotoValueError('EPICS 3.14 imposes a {}-character limit '
'on record names. The record {!r} is {} '
'characters.'
''.format(MAX_RECORD_LENGTH, name, _len))
header = SearchRequestHeader(size, reply, version, cid)
super().__init__(header, b'', payload)
@classmethod
def from_wire(cls, header, *buffers, sender_address=None, validate=False):
# Special-case to handle the fact that data_type holds whether or not
# to reply to the request upon failure - this can cause part of the
# payload to be interpreted as metadata in from_buffer (TODO: is there
# a better place to special-case/fix this?)
payload_buffer = b''.join(buffers)
return cls.from_components(header, b'', payload_buffer,
sender_address=sender_address,
validate=validate)
payload_size = property(lambda self: self.header.payload_size)
reply = property(lambda self: self.header.data_type)
version = property(lambda self: self.header.data_count)
cid = property(lambda self: self.header.parameter1)
name = property(lambda self: bytes(self.buffers[1]).rstrip(b'\x00').decode(STR_ENC))
[docs]class SearchResponse(Message):
"""
Answer a :class:`SearchRequest` giving the address of a Channel.
Fields:
.. attribute:: port
Port number that will accept TCP connections with clients.
.. attribute:: ip
IP address (as a string) that will accept TCP connections with clients.
.. attribute:: cid
Echoing :data:`cid` of :class:`SearchRequest` to let the client match
this response with the original request.
.. attribute:: version
The version of the Channel Access protocol.
"""
__slots__ = ()
ID = 6
HAS_PAYLOAD = True
def __init__(self, port, ip, cid, version):
if ip is None:
ip = '255.255.255.255'
header = SearchResponseHeader(data_type=port,
sid=ipv4_to_int32(ip),
cid=cid)
# Pad a uint16 to fill 8 bytes.
payload = bytes(DBR_INT(version)).ljust(8, b'\x00')
super().__init__(header, payload)
@classmethod
def from_wire(cls, header, *buffers, sender_address=None, validate=False):
# Special-case to handle the fact that data_type field is not the data
# type. (It's used to hold the server port, unrelated to the payload.)
return cls.from_components(header, *buffers,
sender_address=sender_address,
validate=validate)
@property
def ip(self):
# for CA version >= 4.11
return ipv4_from_int32(self.header.parameter1)
@property
def version(self):
return DBR_INT.from_buffer(bytearray(self.buffers[0])[:2]).value
port = property(lambda self: self.header.data_type)
cid = property(lambda self: self.header.parameter2)
[docs]class NotFoundResponse(Message):
"""
Answer a :class:`SearchResponse` in the negative.
Fields:
.. attribute:: cid
Echoing :data:`cid` of :class:`SearchRequest` to let the client match
this response with the original request.
.. attribute:: version
The version of the Channel Access protocol.
"""
__slots__ = ()
ID = 14
HAS_PAYLOAD = False
def __init__(self, version, cid):
header = NotFoundResponseHeader(DO_REPLY, version, cid)
super().__init__(header)
reply_flag = property(lambda self: self.header.data_type)
version = property(lambda self: self.header.data_count)
cid = property(lambda self: self.header.parameter1)
[docs]class EchoRequest(Message):
"""
Request an :class:`EchoResponse`.
This command has no fields.
"""
__slots__ = ()
ID = 23
HAS_PAYLOAD = False
def __init__(self):
super().__init__(EchoRequestHeader())
[docs]class EchoResponse(Message):
"""
Respond to an :class:`EchoRequest`.
This command has no fields.
"""
__slots__ = ()
ID = 23
HAS_PAYLOAD = False
def __init__(self):
super().__init__(EchoResponseHeader())
[docs]class Beacon(Message):
"""
Heartbeat beacon sent by the server.
Fields:
.. attribute:: version
The version of the Channel Access protocol.
.. attribute:: server_port
Port number.
.. attribute:: beacon_id
Sequentially incremented integer.
.. attribute:: address
IP address encoded as integer.
.. attribute:: address_string
IP address as string.
"""
__slots__ = ()
ID = 13
HAS_PAYLOAD = False
def __init__(self, version, server_port, beacon_id, address):
# TODO if address is 0, it should be replaced with the remote ip from
# the udp packet
header = BeaconHeader(version, server_port, beacon_id,
ipv4_to_int32(str(address)))
super().__init__(header)
version = property(lambda self: self.header.data_type)
server_port = property(lambda self: self.header.data_count)
beacon_id = property(lambda self: self.header.parameter1)
address = property(lambda self: ipv4_from_int32(self.header.parameter2))
[docs]class RepeaterConfirmResponse(Message):
"""
Confirm successful client registration with the Repeater.
Fields:
.. attribute:: repeater_address
IP address of repeater (as a string).
"""
__slots__ = ()
ID = 17
HAS_PAYLOAD = False
def __init__(self, repeater_address):
header = RepeaterConfirmResponseHeader(
ipv4_to_int32(str(repeater_address)))
super().__init__(header)
@property
def repeater_address(self):
return ipv4_from_int32(self.header.parameter2)
[docs]class RepeaterRegisterRequest(Message):
"""
Register a client with the Repeater.
Fields:
.. attribute:: client_address
IP address of the client (as a string).
"""
__slots__ = ()
ID = 24
HAS_PAYLOAD = False
def __init__(self, client_address='0.0.0.0'):
header = RepeaterRegisterRequestHeader(
ipv4_to_int32(str(client_address)))
super().__init__(header)
@property
def client_address(self):
return ipv4_from_int32(self.header.parameter2)
class EventAddRequestPayload(ctypes.BigEndianStructure):
'''
Attributes
----------
low : float
Low delta value (deprecated)
high : float
High delta value (deprecated)
to : float
Period between samples (deprecated)
mask : int
Event selection mask
'''
_fields_ = [('low', float_t),
('high', float_t),
('to', float_t),
('mask', ushort_t),
('__padding', short_t),
]
def __init__(self, low=0.0, high=0.0, to=0.0, mask=0):
self.low = low
self.high = high
self.to = to
self.mask = mask
self.__padding = 0
def __len__(self):
return ctypes.sizeof(self)
@property
def nbytes(self):
return len(self)
[docs]class EventAddRequest(Message):
"""
Subscribe; i.e. request to notified of changes in a Channel's value.
Fields:
.. attribute:: data_type
Integer code of desired DBR type of readings.
.. attribute:: data_count
Desired number of elements per reading.
.. attribute:: sid
Integer ID of this Channel designated by the server.
.. attribute:: subscriptionid
New integer ID designated by the client uniquely identifying this
subscription on this Virtual Circuit.
.. attribute:: low
Deprecated. (Use :data:`mask`.)
.. attribute:: high
Deprecated. (Use :data:`mask`.)
.. attribute:: to
Deprecated. (Use :data:`mask`.)
.. attribute:: mask
Mask indicating which changes to report.
"""
__slots__ = ()
ID = 1
HAS_PAYLOAD = True
def __init__(self, data_type, data_count, sid, subscriptionid, low,
high, to, mask):
header = EventAddRequestHeader(
ChannelType(data_type), data_count, sid, subscriptionid
)
payload = EventAddRequestPayload(low=low, high=high, to=to, mask=mask)
super().__init__(header, payload)
@classmethod
def from_wire(cls, header, *buffers, sender_address=None,
validate=False):
payload_struct = EventAddRequestPayload.from_buffer(buffers[0])
return cls.from_components(header, payload_struct,
sender_address=sender_address,
validate=validate)
@property
def payload_struct(self):
return EventAddRequestPayload.from_buffer(self.buffers[0])
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
sid = property(lambda self: self.header.parameter1)
subscriptionid = property(lambda self: self.header.parameter2)
low = property(lambda self: self.payload_struct.low)
high = property(lambda self: self.payload_struct.high)
to = property(lambda self: self.payload_struct.to)
mask = property(lambda self: self.payload_struct.mask)
__padding = property(lambda self: self.payload_struct.__padding)
[docs]class EventAddResponse(Message):
"""
Notify the client of a change in a Channel's value.
Fields:
.. attribute:: data
data as built-in Python or numpy types
.. attribute:: data_type
Integer code of DBR type of reading.
.. attribute:: data_count
Number of elements in this reading.
.. attribute:: sid
Integer ID of this Channel designated by the server.
.. attribute:: status
As per Channel Access spec, 1 is success; 0 or >1 are various failures.
.. attribute:: subscriptionid
Echoing the :data:`subscriptionid` in the :class:`EventAddRequest`
"""
__slots__ = ('__weakref__',)
ID = 1
HAS_PAYLOAD = True
def __init__(self, data, data_type, data_count,
status, subscriptionid, *, metadata=None):
size, *buffers = data_payload(
data, metadata, ChannelType(data_type), data_count
)
status = ensure_eca_value(status)
header = EventAddResponseHeader(size, data_type, data_count,
status, subscriptionid)
super().__init__(header, *buffers)
payload_size = property(lambda self: self.header.payload_size)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
subscriptionid = property(lambda self: self.header.parameter2)
@property
def data(self):
return extract_data(self.buffers[1], self.data_type, self.data_count)
@property
def metadata(self):
return extract_metadata(self.buffers[0], self.data_type)
@property
def status(self):
return eca_value_to_status[self.header.parameter1]
@classmethod
def from_wire(cls, header, payload_bytes, *, sender_address=None,
validate=False):
# libca responds to EventCancelRequest with an
# EventAddResponse with an empty payload.
if not payload_bytes:
return cls.from_components(header,
sender_address=sender_address,
validate=validate)
payload = from_buffer(header.data_type, header.data_count,
payload_bytes)
return cls.from_components(header, *payload,
sender_address=sender_address,
validate=validate)
[docs]class EventCancelRequest(Message):
"""
End notifcations about chnages in a Channel's value.
Fields:
.. attribute:: data_type
Integer code of DBR type of reading.
.. attribute:: sid
Integer ID of this Channel designated by the server.
.. attribute:: subscriptionid
Integer ID for this subscription.
"""
__slots__ = ()
ID = 2
HAS_PAYLOAD = False
def __init__(self, data_type, sid, subscriptionid):
header = EventCancelRequestHeader(
ChannelType(data_type), 0, sid, subscriptionid
)
super().__init__(header)
data_type = property(lambda self: ChannelType(self.header.data_type))
sid = property(lambda self: self.header.parameter1)
subscriptionid = property(lambda self: self.header.parameter2)
[docs]class EventCancelResponse(Message):
"""
Confirm receipt of :class:`EventCancelRequest`.
Fields:
.. attribute:: data_type
Integer code of DBR type of reading.
.. attribute:: sid
Integer ID of this Channel designated by the server.
.. attribute:: subscriptionid
Integer ID for this subscription.
"""
# Actually this is coded with the ID = 1 like EventAdd*.
__slots__ = ()
ID = 2
HAS_PAYLOAD = False
def __init__(self, data_type, sid, subscriptionid, data_count):
header = EventCancelResponseHeader(
ChannelType(data_type), data_count, sid, subscriptionid
)
super().__init__(header)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
sid = property(lambda self: self.header.parameter1)
subscriptionid = property(lambda self: self.header.parameter2)
def validate(self):
# special case because of weird ID
if self.header.command != 1:
raise ValidationError("A {} must have a header with "
"header.command == 1, not {}."
"".format(type(self), self.header.command))
if any(len(buf) for buf in self.buffers):
raise ValidationError("A {} must have no payload."
"".format(type(self)))
# do not call super()
[docs]class ReadRequest(Message):
"Deprecated by Channel Access since 3.13. See :class:`ReadNotifyRequest`."
__slots__ = ()
ID = 3
HAS_PAYLOAD = False
def __init__(self, data_type, data_count, sid, ioid):
header = ReadRequestHeader(ChannelType(data_type), data_count, sid, ioid)
super().__init__(header, validate=False)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
sid = property(lambda self: self.header.parameter1)
ioid = property(lambda self: self.header.parameter2)
[docs]class ReadResponse(Message):
"Deprecated by Channel Access since 3.13. See :class:`ReadNotifyResponse`."
__slots__ = ()
ID = 3
HAS_PAYLOAD = True
@classmethod
def from_wire(cls, header, payload_bytes, *, sender_address=None,
validate=False):
warnings.warn("ReadResponse was deprecated by ChannelAccess in 3.13, "
"and is not well-supported by caproto. De-serialization "
"may not be correct.")
return super().from_wire(header, payload_bytes,
sender_address=sender_address,
validate=validate)
def __init__(self, data, data_type, data_count, sid, ioid, *,
metadata=None):
warnings.warn("ReadResponse was deprecated by ChannelAccess in 3.13, "
"and is not well-supported by caproto. Serialization "
"may not be correct.")
size, *buffers = data_payload(data, metadata, data_type, data_count)
header = ReadResponseHeader(size, data_type, data_count, sid, ioid)
super().__init__(header, *buffers)
payload_size = property(lambda self: self.header.payload_size)
data_type = property(lambda self: self.header.data_type)
data_count = property(lambda self: self.header.data_count)
sid = property(lambda self: self.header.parameter1)
ioid = property(lambda self: self.header.parameter2)
@property
def data(self):
return extract_data(self.buffers[1], self.data_type, self.data_count)
@property
def metadata(self):
return extract_metadata(self.buffers[0], self.data_type)
[docs]class WriteRequest(Message):
"Deprecated: See :class:`WriteNotifyRequest`."
__slots__ = ()
ID = 4
HAS_PAYLOAD = True
def __init__(self, data, data_type, data_count, sid, ioid, *,
metadata=None):
size, *buffers = data_payload(data, metadata, data_type, data_count)
header = WriteRequestHeader(size, data_type, data_count, sid, ioid)
super().__init__(header, *buffers)
payload_size = property(lambda self: self.header.payload_size)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
sid = property(lambda self: self.header.parameter1)
ioid = property(lambda self: self.header.parameter2)
@property
def data(self):
return extract_data(self.buffers[1], self.data_type, self.data_count)
@property
def metadata(self):
return extract_metadata(self.buffers[0], self.data_type)
# There is no 'WriteResponse'. See WriteNotifyRequest/WriteNotifyResponse.
[docs]class EventsOffRequest(Message):
"""
Temporarily turn off :class:`EventAddResponse` notifications.
This command has no fields.
"""
__slots__ = ()
ID = 8
HAS_PAYLOAD = False
def __init__(self):
super().__init__(EventsOffRequestHeader())
[docs]class EventsOnRequest(Message):
"""
Restore :class:`EventAddResponse` notifications.
This command has no fields.
"""
__slots__ = ()
ID = 9
HAS_PAYLOAD = False
def __init__(self):
super().__init__(EventsOnRequestHeader())
[docs]class ReadSyncRequest(Message):
"Deprecated by Channel Access: See :class:`ReadNotifyRequest`"
__slots__ = ()
ID = 10
HAS_PAYLOAD = False
def __init__(self):
super().__init__(ReadSyncRequestHeader())
[docs]class ErrorResponse(Message):
"""
Notify client of a server-side error, including some details about error.
Fields:
.. attribute:: cid
Integer ID for this Channel designated by the client.
.. attribute:: status
As per Channel Access spec, 1 is success; 0 or >1 are various failures.
"""
__slots__ = ()
ID = 11
HAS_PAYLOAD = True
def __init__(self, original_request, cid, status, error_message):
msg_size, msg_payload = padded_string_payload(error_message)
req_bytes = bytes(original_request.header)
size = len(req_bytes) + msg_size
payload = req_bytes + msg_payload
status = ensure_eca_value(status)
header = ErrorResponseHeader(size, cid, status)
super().__init__(header, b'', payload)
payload_size = property(lambda self: self.header.payload_size)
cid = property(lambda self: self.header.parameter1)
@property
def error_message(self):
err_msg_bytes = bytearray(self.buffers[1][_MessageHeaderSize:])
return err_msg_bytes
@property
def original_request(self):
req_bytes = bytearray(self.buffers[1][:_MessageHeaderSize])
return MessageHeader.from_buffer(req_bytes)
@property
def status(self):
return eca_value_to_status[self.header.parameter2]
@classmethod
def from_wire(cls, header, payload_bytes, *, sender_address=None,
validate=False):
return cls.from_components(header, b'', payload_bytes,
sender_address=sender_address,
validate=validate)
[docs]class ClearChannelRequest(Message):
"""
Close a Channel.
Fields:
.. attribute:: cid
Integer ID for this Channel designated by the client.
.. attribute:: sid
Integer ID for this Channel designated by the server.
"""
__slots__ = ()
ID = 12
HAS_PAYLOAD = False
def __init__(self, sid, cid):
super().__init__(ClearChannelRequestHeader(sid, cid))
sid = property(lambda self: self.header.parameter1)
cid = property(lambda self: self.header.parameter2)
[docs]class ClearChannelResponse(Message):
"""
Confirm that a Channel has been closed.
Fields:
.. attribute:: cid
Integer ID for this Channel designated by the client.
.. attribute:: sid
Integer ID for this Channel designated by the server.
"""
__slots__ = ()
ID = 12
HAS_PAYLOAD = False
def __init__(self, sid, cid):
super().__init__(ClearChannelResponseHeader(sid, cid))
sid = property(lambda self: self.header.parameter1)
cid = property(lambda self: self.header.parameter2)
[docs]class ReadNotifyRequest(Message):
"""
Request a fresh reading of a Channel.
Fields:
.. attribute:: data_type
Integer code of desired DBR type of readings.
.. attribute:: data_count
Desired number of elements per reading.
.. attribute:: sid
Integer ID for this Channel designated by the server.
.. attribute:: ioid
New integer ID uniquely identifying this I/O transaction on this
Virtual Circuit.
"""
__slots__ = ()
ID = 15
HAS_PAYLOAD = False
def __init__(self, data_type, data_count, sid, ioid):
header = ReadNotifyRequestHeader(
ChannelType(data_type), data_count, sid, ioid
)
super().__init__(header)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
sid = property(lambda self: self.header.parameter1)
ioid = property(lambda self: self.header.parameter2)
[docs]class ReadNotifyResponse(Message):
"""
Request a fresh reading of a Channel.
Fields:
.. attribute:: data
data as built-in Python or numpy types
.. attribute:: metadata
metadata in a ctypes.Structure
.. attribute:: data_type
Integer code of desired DBR type of readings.
.. attribute:: data_count
Desired number of elements per reading.
.. attribute:: status
As per Channel Access spec, 1 is success; 0 or >1 are various failures.
.. attribute:: ioid
Integer ID for I/O transaction, echoing :class:`ReadNotifyRequest`.
"""
__slots__ = ()
ID = 15
HAS_PAYLOAD = True
def __init__(self, data, data_type, data_count, status, ioid, *,
metadata=None):
size, *buffers = data_payload(data, metadata, data_type, data_count)
status = ensure_eca_value(status)
header = ReadNotifyResponseHeader(size, data_type, data_count, status,
ioid)
super().__init__(header, *buffers)
payload_size = property(lambda self: self.header.payload_size)
@property
def data(self):
return extract_data(self.buffers[1], self.data_type, self.data_count)
@property
def metadata(self):
return extract_metadata(self.buffers[0], self.data_type)
@property
def status(self):
return eca_value_to_status[self.header.parameter1]
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
ioid = property(lambda self: self.header.parameter2)
class CreateChanRequest(Message):
"""
Request a new Channel.
Fields:
.. attribute:: name
String name of the channel (i.e. 'PV')
.. attribute:: cid
New integer ID designated by the client, uniquely identifying this
Channel on its VirtualCircuit.
.. attribute:: version
The version of the Channel Access protocol.
"""
__slots__ = ()
ID = 18
HAS_PAYLOAD = True
def __init__(self, name, cid, version):
size, payload = padded_string_payload(name)
header = CreateChanRequestHeader(size, cid, version)
super().__init__(header, b'', payload)
@classmethod
def from_wire(cls, header, payload_bytes, *, sender_address=None,
validate=False):
"""
Use header.dbr_type to pack payload bytes into the right strucutre.
Some Command types allocate a different meaning to the header.dbr_type
field, and these override this method in their subclass.
"""
return cls.from_components(header, b'', payload_bytes,
sender_address=sender_address,
validate=validate)
payload_size = property(lambda self: self.header.payload_size)
cid = property(lambda self: self.header.parameter1)
version = property(lambda self: self.header.parameter2)
name = property(lambda self: bytes(self.buffers[1]).rstrip(b'\x00').decode(STR_ENC))
class CreateChanResponse(Message):
"""
Confirm the intialization of a new Channel
Fields:
.. attribute:: data_type
Integer code of native DBR type of readings.
.. attribute:: data_count
Native number of elements per reading.
.. attribute:: cid
Integer ID for this Channel designated by the client, echoing the value
in :class:`CreateChanRequest`.
.. attribute:: sid
New integer ID for this Channel designated by the server uniquely
identifying this Channel on its VirtualCircuit.
"""
__slots__ = ()
ID = 18
HAS_PAYLOAD = False
def __init__(self, data_type, data_count, cid, sid):
header = CreateChanResponseHeader(
ChannelType(data_type), data_count, cid, sid
)
super().__init__(header)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
cid = property(lambda self: self.header.parameter1)
sid = property(lambda self: self.header.parameter2)
[docs]class WriteNotifyRequest(Message):
"""
Write a value to a Channel.
Fields:
.. attribute:: data
data as built-in Python or numpy types
.. attribute:: metadata
metadata in a ctypes.Structure
.. attribute:: data_type
Integer code of DBR type.
.. attribute:: data_count
Number of elements.
.. attribute:: sid
Integer ID for this Channel designated by the server.
.. attribute:: ioid
New integer ID uniquely identifying this I/O transaction on this
Virtual Circuit.
"""
__slots__ = ()
ID = 19
HAS_PAYLOAD = True
def __init__(self, data, data_type, data_count, sid, ioid, *,
metadata=None):
size, *buffers = data_payload(data, metadata, data_type, data_count)
header = WriteNotifyRequestHeader(size, data_type, data_count, sid,
ioid)
super().__init__(header, *buffers)
payload_size = property(lambda self: self.header.payload_size)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
sid = property(lambda self: self.header.parameter1)
ioid = property(lambda self: self.header.parameter2)
@property
def data(self):
return extract_data(self.buffers[1], self.data_type, self.data_count)
@property
def metadata(self):
return extract_metadata(self.buffers[0], self.data_type)
[docs]class WriteNotifyResponse(Message):
"""
Confirm the receipt of a :class:`WriteNotifyRequest`.
Fields:
.. attribute:: data_type
Integer code of DBR type.
.. attribute:: data_count
Number of elements.
.. attribute:: sid
Integer ID for this Channel designated by the server.
.. attribute:: ioid
Integer ID for this I/O transaction, echoing
:class:`WriteNotifyRequest`.
.. attribute:: status
As per Channel Access spec, 1 is success; 0 or >1 are various failures.
"""
__slots__ = ()
ID = 19
HAS_PAYLOAD = False
def __init__(self, data_type, data_count, status, ioid):
status = ensure_eca_value(status)
header = WriteNotifyResponseHeader(
ChannelType(data_type), data_count, status, ioid
)
super().__init__(header)
data_type = property(lambda self: ChannelType(self.header.data_type))
data_count = property(lambda self: self.header.data_count)
ioid = property(lambda self: self.header.parameter2)
@property
def status(self):
return eca_value_to_status[self.header.parameter1]
[docs]class ClientNameRequest(Message):
"""
Tell the server the client name (i.e., user name) of the client.
Fields:
.. attribute:: name
Client name.
"""
__slots__ = ()
ID = 20
HAS_PAYLOAD = True
def __init__(self, name):
size, payload = padded_string_payload(name)
header = ClientNameRequestHeader(size)
super().__init__(header, b'', payload)
@classmethod
def from_wire(cls, header, payload_bytes, *, sender_address=None,
validate=False):
"""
Use header.dbr_type to pack payload bytes into the right strucutre.
Some Command types allocate a different meaning to the header.dbr_type
field, and these override this method in their subclass.
"""
return cls.from_components(header, b'', payload_bytes,
sender_address=sender_address,
validate=validate)
payload_size = property(lambda self: self.header.payload_size)
name = property(lambda self: bytes(self.buffers[1]).rstrip(b'\x00').decode(STR_ENC))
[docs]class HostNameRequest(Message):
"""
Tell the server the host name of the client.
Fields:
.. attribute:: name
Host name.
"""
__slots__ = ()
ID = 21
HAS_PAYLOAD = True
def __init__(self, name):
size, payload = padded_string_payload(name)
header = HostNameRequestHeader(size)
super().__init__(header, b'', payload)
payload_size = property(lambda self: self.header.payload_size)
name = property(lambda self: bytes(self.buffers[1]).rstrip(b'\x00').decode(STR_ENC))
@classmethod
def from_wire(cls, header, payload_bytes, *, sender_address=None,
validate=False):
return cls.from_components(header, b'', payload_bytes,
sender_address=sender_address,
validate=validate)
[docs]class AccessRightsResponse(Message):
"""
Notify the client that channel creation failed.
Fields:
.. attribute:: cid
Integer ID for this Channel designated by the client.
.. attribute:: access_rights
Integer designated level of read or write access. (See Channel Access
spec for details about meanings.)
"""
__slots__ = ()
ID = 22
HAS_PAYLOAD = False
def __init__(self, cid, access_rights):
header = AccessRightsResponseHeader(cid, access_rights)
super().__init__(header)
cid = property(lambda self: self.header.parameter1)
access_rights = property(lambda self: AccessRights(self.header.parameter2))
[docs]class CreateChFailResponse(Message):
"""
Notify the client that channel creation failed.
Fields:
.. attribute:: cid
Integer ID for this Channel designated by the client.
"""
__slots__ = ()
ID = 26
HAS_PAYLOAD = False
def __init__(self, cid):
super().__init__(CreateChFailResponseHeader(cid))
cid = property(lambda self: self.header.parameter1)
[docs]class ServerDisconnResponse(Message):
"""
Notify the client that server will disconnect from this Channel.
Fields:
.. attribute:: cid
Integer ID for this Channel designated by the client.
"""
__slots__ = ()
ID = 27
HAS_PAYLOAD = False
def __init__(self, cid):
super().__init__(ServerDisconnResponseHeader(cid))
cid = property(lambda self: self.header.parameter1)