# These classes ChannelData classes hold the state associated with the data
# source underlying a Channel, including data values, alarm state, and
# metadata. They perform data type conversions in response to requests to read
# data as a certain type, and they push updates into queues registered by a
# higher-level server.
import copy
import time
import weakref
from collections import defaultdict, namedtuple
from collections.abc import Iterable
from ._backend import backend
from ._commands import parse_metadata
from ._constants import MAX_ENUM_STATES, MAX_ENUM_STRING_SIZE
from ._dbr import (DBR_STSACK_STRING, DBR_TYPES, AccessRights, AlarmSeverity,
AlarmStatus, ChannelType, GraphicControlBase,
SubscriptionType, _channel_type_by_name,
_LongStringChannelType, native_type, native_types,
time_types, timestamp_to_epics)
from ._utils import (CaprotoError, CaprotoValueError, ConversionDirection,
is_array_read_only)
__all__ = ('Forbidden',
'ChannelAlarm',
'ChannelByte',
'ChannelChar',
'ChannelData',
'ChannelDouble',
'ChannelEnum',
'ChannelFloat',
'ChannelInteger',
'ChannelNumeric',
'ChannelShort',
'ChannelString',
'SkipWrite',
)
SubscriptionUpdate = namedtuple('SubscriptionUpdate',
('sub_specs', 'metadata', 'values',
'flags', 'sub'))
class Forbidden(CaprotoError):
...
class CannotExceedLimits(CaprotoValueError):
...
class SkipWrite(Exception):
"""Raise this exception to skip further processing of write()."""
def dbr_metadata_to_dict(dbr_metadata, string_encoding):
'''Return a dictionary of metadata keys to values'''
info = dbr_metadata.to_dict()
try:
info['units'] = info['units'].decode(string_encoding)
except KeyError:
...
return info
def _read_only_property(key, doc=None):
'''Create property that gives read-only access to instance._data[key]'''
if doc is None:
doc = f"Read-only access to {key} data"
return property(lambda self: self._data[key], doc=doc)
class ChannelAlarm:
def __init__(self, *, status=0, severity=0,
must_acknowledge_transient=True, severity_to_acknowledge=0,
alarm_string='', string_encoding='latin-1'):
"""
Alarm metadata that can be used by one or more ChannelData instances.
Parameters
----------
status : int or AlarmStatus, optional
Status information.
severity : int or AlarmSeverity, optional
Severity information.
must_acknowledge_transient : int, optional
Whether or not transient alarms must be acknowledged.
severity_to_acknowledge : int, optional
The highest alarm severity to acknowledge. If the current alarm
severity is less then or equal to this value the alarm is
acknowledged.
alarm_string : str, optional
Alarm information.
string_encoding : str, optional
String encoding of the alarm string.
"""
self._channels = weakref.WeakSet()
self.string_encoding = string_encoding
self._data = dict(
status=status, severity=severity,
must_acknowledge_transient=must_acknowledge_transient,
severity_to_acknowledge=severity_to_acknowledge,
alarm_string=alarm_string)
def __getnewargs_ex__(self):
kwargs = {
'status': self.status,
'severity': self.severity,
'must_acknowledge_transient': self.must_acknowledge_transient,
'severity_to_acknowledge': self.severity_to_acknowledge,
'alarm_string': self.alarm_string,
'string_encoding': self.string_encoding,
}
return ((), kwargs)
status = _read_only_property('status',
doc='Current alarm status')
severity = _read_only_property('severity',
doc='Current alarm severity')
must_acknowledge_transient = _read_only_property(
'must_acknowledge_transient',
doc='Toggle whether or not transient alarms must be acknowledged')
severity_to_acknowledge = _read_only_property(
'severity_to_acknowledge',
doc='The alarm severity that has been acknowledged')
alarm_string = _read_only_property('alarm_string',
doc='String associated with alarm')
def __repr__(self):
return f'<ChannelAlarm(status={self.status}, severity={self.severity})>'
def connect(self, channel_data):
"""Add a ChannelData instance to the channel set using this alarm."""
self._channels.add(channel_data)
def disconnect(self, channel_data):
"""Remove ChannelData instance from channel set using this alarm."""
self._channels.remove(channel_data)
async def read(self, dbr=None):
"""Read alarm information into a DBR_STSACK_STRING instance."""
if dbr is None:
dbr = DBR_STSACK_STRING()
dbr.status = self.status
dbr.severity = self.severity
dbr.ackt = 1 if self.must_acknowledge_transient else 0
dbr.acks = self.severity_to_acknowledge
dbr.value = self.alarm_string.encode(self.string_encoding)
return dbr
async def write(self, *, status=None, severity=None,
must_acknowledge_transient=None,
severity_to_acknowledge=None,
alarm_string=None, flags=0, publish=True):
"""
Write data to the alarm and optionally publish it to clients.
Parameters
----------
status : int or AlarmStatus, optional
Status information.
severity : int or AlarmSeverity, optional
Severity information.
must_acknowledge_transient : int, optional
Whether or not transient alarms must be acknowledged.
severity_to_acknowledge : int, optional
The highest alarm severity to acknowledge. If the current alarm
severity is less then or equal to this value the alarm is
acknowledged.
alarm_string : str, optional
Alarm information.
flags : SubscriptionType or int, optional
Subscription flags.
publish : bool, optional
Optionally publish the alarm status after the write.
"""
data = self._data
if status is not None:
data['status'] = AlarmStatus(status)
flags |= SubscriptionType.DBE_VALUE
if severity is not None:
data['severity'] = AlarmSeverity(severity)
if (not self.must_acknowledge_transient or
self.severity_to_acknowledge < self.severity):
data['severity_to_acknowledge'] = self.severity
flags |= SubscriptionType.DBE_ALARM
if must_acknowledge_transient is not None:
data['must_acknowledge_transient'] = must_acknowledge_transient
if (not must_acknowledge_transient and
self.severity_to_acknowledge > self.severity):
# Reset the severity to acknowledge if disabling transient
# requirement
data['severity_to_acknowledge'] = self.severity
flags |= SubscriptionType.DBE_ALARM
if severity_to_acknowledge is not None:
# To clear, set greater than or equal to the
# severity_to_acknowledge
if severity_to_acknowledge >= self.severity:
data['severity_to_acknowledge'] = 0
flags |= SubscriptionType.DBE_ALARM
if alarm_string is not None:
data['alarm_string'] = alarm_string
flags |= SubscriptionType.DBE_ALARM
if publish:
await self.publish(flags)
async def publish(self, flags, *, except_for=None):
"""
Publish alarm information to all listening channels.
Parameters
----------
flags : SubscriptionType
The subscription type to publish.
except_for : sequence of ChannelData, optional
Skip publishing to these channels, mainly to avoid recursion.
"""
except_for = except_for or ()
for channel in self._channels:
if channel not in except_for:
await channel.publish(flags)
[docs]class ChannelData:
"""
Base class holding data and metadata which can be sent across a Channel.
Parameters
----------
value :
Data which has to match with this class's ``data_type``.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
"""
data_type = ChannelType.LONG
_compatible_array_types = {}
def __init__(self, *, alarm=None, value=None, timestamp=None,
max_length=None, string_encoding='latin-1',
reported_record_type='caproto'):
if timestamp is None:
timestamp = time.time()
if alarm is None:
alarm = ChannelAlarm()
self._alarm = None
self._status = None
self._severity = None
# now use the setter to attach the alarm correctly:
self.alarm = alarm
self._max_length = max_length
self.string_encoding = string_encoding
self.reported_record_type = reported_record_type
if self._max_length is None:
# Use the current length as maximum, if unspecified.
self._max_length = max(self.calculate_length(value), 1)
# It is possible to pass in a zero-length array to start with.
# However, it is not useful to have an empty value forever, so the
# minimum length here is required to be at least 1.
value = self.preprocess_value(value)
self._data = dict(value=value,
timestamp=timestamp)
# This is a dict keyed on queues that will receive subscription
# updates. (Each queue belongs to a Context.) Each value is itself a
# dict, mapping data_types to the set of SubscriptionSpecs that request
# that data_type.
self._queues = defaultdict(
lambda: defaultdict(
lambda: defaultdict(set)))
# Cache results of data_type conversions. This maps data_type to
# (metdata, value). This is cleared each time publish() is called.
self._content = {}
self._snapshots = defaultdict(dict)
self._fill_at_next_write = list()
def calculate_length(self, value):
'Calculate the number of elements given a value'
is_array = isinstance(value, (list, tuple) + backend.array_types)
if is_array:
return len(value)
if isinstance(value, (bytes, str, bytearray)):
if self.data_type == ChannelType.CHAR:
return len(value)
return 1
@property
def length(self):
'The number of elements (length) of the current value'
return self.calculate_length(self.value)
@property
def max_length(self):
'The maximum number of elements (length) this channel can hold'
return self._max_length
def preprocess_value(self, value):
'''Pre-process values destined for verify_value and write
A few basic things are done here, based on max_count:
1. If length >= 2, ensure the value is a list
2. If length == 1, ensure the value is an unpacked scalar
3. Ensure len(value) <= length
Raises
------
CaprotoValueError
'''
is_array = isinstance(value, (list, tuple) + backend.array_types)
if is_array:
if len(value) > self._max_length:
# TODO consider an exception for caproto-only environments that
# can handle dynamically resized arrays (i.e., sizes greater
# than the initial max_length)?
raise CaprotoValueError(
f'Value of length {len(value)} is too large for '
f'{self.__class__.__name__}(max_length={self._max_length})'
)
if self._max_length == 1:
if is_array:
if len(value):
# scalar value in a list -> scalar value
return value[0]
raise CaprotoValueError(
'Cannot set a scalar to an empty array')
elif not is_array:
# scalar value that should be in a list -> list
return [value]
return value
def __getnewargs_ex__(self):
# ref: https://docs.python.org/3/library/pickle.html
kwargs = {'timestamp': self.timestamp,
'alarm': self.alarm,
'string_encoding': self.string_encoding,
'reported_record_type': self.reported_record_type,
'data': self._data,
'max_length': self._max_length}
return ((), kwargs)
value = _read_only_property('value')
timestamp = _read_only_property('timestamp')
# "before" — only the last value received before the state changes from
# false to true is forwarded to the client.
# "first" — only the first value received after the state changes from true
# to false is forwarded to the client.
# "while" — values are forwarded to the client as long as the state is true.
# "last" — only the last value received before the state changes from true
# to false is forwarded to the client.
# "after" — only the first value received after the state changes from true
# to false is forwarded to the client.
# "unless" — values are forwarded to the client as long as the state is
# false.
def pre_state_change(self, state, new_value):
"This is called by the server when it enters its StateUpdateContext."
snapshots = self._snapshots[state]
snapshots.clear()
snapshot = copy.deepcopy(self)
if new_value:
# We are changing from false to true.
snapshots['before'] = snapshot
else:
# We are changing from true to false.
snapshots['last'] = snapshot
def post_state_change(self, state, new_value):
"This is called by the server when it exits its StateUpdateContext."
snapshots = self._snapshots[state]
if new_value:
# We have changed from false to true.
snapshots['while'] = self
self._fill_at_next_write.append((state, 'after'))
else:
# We have changed from true to false.
snapshots['unless'] = self
self._fill_at_next_write.append((state, 'first'))
@property
def alarm(self):
'The ChannelAlarm associated with this data'
return self._alarm
@alarm.setter
def alarm(self, alarm):
old_alarm = self._alarm
if old_alarm is alarm:
return
if old_alarm is not None:
old_alarm.disconnect(self)
self._alarm = alarm
if alarm is not None:
alarm.connect(self)
async def subscribe(self, queue, sub_spec, sub):
"""
Subscribe a queue for the given subscription specification.
Parameters
----------
queue : asyncio.Queue or compatible
The queue to send data to.
sub_spec : SubscriptionSpec
The matching subscription specification.
sub : Subscription
The subscription instance.
"""
by_sync = self._queues[queue][sub_spec.channel_filter.sync]
by_sync[sub_spec.data_type_name].add(sub_spec)
# Always send current reading immediately upon subscription.
try:
metadata, values = self._content[sub_spec.data_type_name]
except KeyError:
# Do the expensive data type conversion and cache it in case
# a future subscription wants the same data type.
data_type = _channel_type_by_name[sub_spec.data_type_name]
metadata, values = await self._read(data_type)
self._content[sub_spec.data_type_name] = metadata, values
await queue.put(SubscriptionUpdate((sub_spec,), metadata, values, 0, sub))
async def unsubscribe(self, queue, sub_spec):
"""
Unsubscribe a queue for the given subscription specification.
Parameters
----------
queue : asyncio.Queue or compatible
The queue to send data to.
sub_spec : SubscriptionSpec
The subscription specification.
"""
by_sync = self._queues[queue][sub_spec.channel_filter.sync]
by_sync[sub_spec.data_type_name].discard(sub_spec)
async def auth_read(self, hostname, username, data_type, *,
user_address=None):
'''Get DBR data and native data, converted to a specific type'''
access = self.check_access(hostname, username)
if AccessRights.READ not in access:
raise Forbidden("Client with hostname {} and username {} cannot "
"read.".format(hostname, username))
return (await self.read(data_type))
async def read(self, data_type):
"""
Read out the ChannelData as ``data_type``.
Parameters
----------
data_type : ChannelType
The data type to read out.
"""
# Subclass might trigger a write here to update self._data before
# reading it out.
return (await self._read(data_type))
async def _read(self, data_type):
"""
Inner method to read out the ChannelData as ``data_type``.
Parameters
----------
data_type : ChannelType
The data type to read out.
"""
# special cases for alarm strings and class name
if data_type == ChannelType.STSACK_STRING:
ret = await self.alarm.read()
return (ret, b'')
elif data_type == ChannelType.CLASS_NAME:
class_name = DBR_TYPES[data_type]()
rtyp = self.reported_record_type.encode(self.string_encoding)
class_name.value = rtyp
return class_name, b''
if data_type in _LongStringChannelType:
native_to = _LongStringChannelType.LONG_STRING
data_type = ChannelType(data_type)
else:
native_to = native_type(data_type)
values = backend.convert_values(
values=self._data['value'],
from_dtype=self.data_type,
to_dtype=native_to,
string_encoding=self.string_encoding,
enum_strings=self._data.get('enum_strings'),
direction=ConversionDirection.TO_WIRE,
)
# for native types, there is no dbr metadata - just data
if data_type in native_types:
return b'', values
dbr_metadata = DBR_TYPES[data_type]()
self._read_metadata(dbr_metadata)
# Copy alarm fields also.
alarm_dbr = await self.alarm.read()
for field, _ in alarm_dbr._fields_:
if hasattr(dbr_metadata, field):
setattr(dbr_metadata, field, getattr(alarm_dbr, field))
return dbr_metadata, values
async def auth_write(self, hostname, username, data, data_type, metadata,
*, flags=0, user_address=None):
"""
Data write hook for clients.
First verifies that the specified client may write to the instance
prior to proceeding.
Parameters
----------
hostname : str
The hostname of the client.
username : str
The username of the client.
data :
The data to write.
data_type : ChannelType
The data type associated with the written data.
metadata :
Metadata instance.
flags : int, optional
Flags for publishing.
user_address : tuple, optional
(host, port) tuple of the user.
Raises
------
Forbidden:
If client is not allowed to write to this instance.
"""
access = self.check_access(hostname, username)
if AccessRights.WRITE not in access:
raise Forbidden("Client with hostname {} and username {} cannot "
"write.".format(hostname, username))
return (await self.write_from_dbr(data, data_type, metadata,
flags=flags))
async def verify_value(self, data):
"""
Verify a value prior to it being written by CA or Python
To reject a value, raise an exception. Otherwise, return the
original value or a modified version of it.
"""
return data
async def write_from_dbr(self, data, data_type, metadata, *, flags=0):
"""
Write data from a provided DBR data type.
Does not verify the client has authorization to write.
Parameters
----------
data :
The data to write.
data_type : ChannelType
The data type associated with the written data.
metadata :
Metadata instance.
flags : int, optional
Flags for publishing.
Raises
------
CaprotoValueError:
If the request is not valid.
"""
if data_type == ChannelType.PUT_ACKS:
await self.alarm.write(severity_to_acknowledge=metadata.value)
return
elif data_type == ChannelType.PUT_ACKT:
await self.alarm.write(must_acknowledge_transient=metadata.value)
return
elif data_type in (ChannelType.STSACK_STRING, ChannelType.CLASS_NAME):
raise CaprotoValueError('Bad request')
timestamp = time.time()
native_from = native_type(data_type)
value = backend.convert_values(
values=data, from_dtype=native_from,
to_dtype=self.data_type,
string_encoding=self.string_encoding,
enum_strings=getattr(self, 'enum_strings', None),
direction=ConversionDirection.FROM_WIRE)
if metadata is None:
metadata_dict = {}
else:
# Convert `metadata` to bytes-like (or pass it through).
md_payload = parse_metadata(metadata, data_type)
# Depending on the type of `metadata` above,
# `md_payload` could be a DBR struct or plain bytes.
# Load it into a struct (zero-copy) to be sure.
dbr_metadata = DBR_TYPES[data_type].from_buffer(md_payload)
metadata_dict = dbr_metadata_to_dict(dbr_metadata,
self.string_encoding)
metadata_dict.setdefault('timestamp', timestamp)
return (await self.write(value, flags=flags, **metadata_dict))
async def write(self, value, *, flags=0, verify_value=True,
update_fields=True, **metadata):
"""
Write data from native Python types.
Parameters
----------
value :
The native Python data.
flags : SubscriptionType or int, optional
The flags for subscribers.
verify_value : bool, optional
Run the ``verify_value`` hook prior to updating internal state.
update_fields : bool, optional
Run the ``update_fields`` hook prior to updating internal state.
Raises
------
Exception:
Any exception raised in the handlers (except SkipWrite) will be
propagated to the caller.
"""
try:
value = self.preprocess_value(value)
if verify_value:
modified_value = await self.verify_value(value)
else:
modified_value = None
if update_fields:
await self.update_fields(
modified_value if verify_value else value
)
except SkipWrite:
# Handler raised SkipWrite to avoid the rest of this method.
return
except GeneratorExit:
raise
except Exception:
# TODO: should allow exception to optionally pass alarm
# status/severity through exception instance
await self.alarm.write(status=AlarmStatus.WRITE,
severity=AlarmSeverity.MAJOR_ALARM,
)
raise
finally:
alarm_md = self._collect_alarm()
if modified_value is SkipWrite:
# An alternative to raising SkipWrite: avoid the rest of this
# method.
return
# issues of over-riding user passed in data here!
metadata.update(alarm_md)
metadata.setdefault('timestamp', time.time())
if self._fill_at_next_write:
snapshot = copy.deepcopy(self)
for state, mode in self._fill_at_next_write:
self._snapshots[state][mode] = snapshot
self._fill_at_next_write.clear()
new = modified_value if modified_value is not None else value
# TODO the next 5 lines should be done in one move
self._data['value'] = new
await self.write_metadata(publish=False, **metadata)
# Send a new event to subscribers.
await self.publish(flags)
# and publish any linked alarms
if 'status' in metadata or 'severity' in metadata:
await self.alarm.publish(flags, except_for=(self,))
def _is_eligible(self, ss):
sync = ss.channel_filter.sync
return sync is None or sync.m in self._snapshots[sync.s]
async def update_fields(self, value):
"""This is a hook for subclasses to update field instance data."""
async def publish(self, flags):
"""
Publish data to appropriate queues matching the SubscriptionSpec.
Each SubscriptionSpec specifies a certain data type it is interested in
and a mask. Send one update per queue per data_type if and only if any
subscriptions specs on a queue have a compatible mask.
Parameters
----------
flags : SubscriptionSpec
The subscription specification to match.
"""
# Copying the data into structs with various data types is expensive,
# so we only want to do it if it's going to be used, and we only want
# to do each conversion once. Clear the cache to start. This cache is
# instance state so that self.subscribe can also use it.
self._content.clear()
for queue, syncs in self._queues.items():
# queue belongs to a Context that is expecting to receive
# updates of the form (sub_specs, metadata, values).
# data_types is a dict grouping the sub_specs for this queue by
# their data_type.
for sync, data_types in syncs.items():
for data_type_name, sub_specs in data_types.items():
eligible = tuple(ss for ss in sub_specs
if self._is_eligible(ss))
if not eligible:
continue
if sync is None:
channel_data = self
else:
try:
channel_data = self._snapshots[sync.s][sync.m]
except KeyError:
continue
try:
metdata, values = self._content[data_type_name]
except KeyError:
# Do the expensive data type conversion and cache it in
# case another queue or a future subscription wants the
# same data type.
data_type = _channel_type_by_name[data_type_name]
metadata, values = await channel_data._read(data_type)
channel_data._content[data_type] = metadata, values
# We will apply the array filter and deadband on the other side
# of the queue, since each eligible SubscriptionSpec may
# want a different slice. Sending the whole array through
# the queue isn't any more expensive that sending a slice;
# this is just a reference.
await queue.put(SubscriptionUpdate(eligible, metadata, values, flags, None))
def _read_metadata(self, dbr_metadata):
"""Fill the provided metadata instance with current metadata."""
to_type = ChannelType(dbr_metadata.DBR_ID)
data = self._data
if hasattr(dbr_metadata, 'units'):
units = data.get('units', '')
if isinstance(units, str):
units = units.encode(self.string_encoding
if self.string_encoding
else 'latin-1')
dbr_metadata.units = units
if hasattr(dbr_metadata, 'precision'):
dbr_metadata.precision = data.get('precision', 0)
if to_type in time_types:
epics_ts = timestamp_to_epics(data['timestamp'])
stamp = dbr_metadata.stamp
stamp.secondsSinceEpoch, stamp.nanoSeconds = epics_ts
convert_attrs = (GraphicControlBase.control_fields +
GraphicControlBase.graphic_fields)
if any(hasattr(dbr_metadata, attr) for attr in convert_attrs):
# convert all metadata types to the target type
dt = (self.data_type
if self.data_type != ChannelType.ENUM
else ChannelType.INT)
values = backend.convert_values(
values=[data.get(key, 0) for key in convert_attrs],
from_dtype=dt,
to_dtype=native_type(to_type),
string_encoding=self.string_encoding,
direction=ConversionDirection.TO_WIRE,
auto_byteswap=False)
if isinstance(values, backend.array_types):
values = values.tolist()
for attr, value in zip(convert_attrs, values):
if hasattr(dbr_metadata, attr):
setattr(dbr_metadata, attr, value)
async def write_metadata(self, publish=True, units=None, precision=None,
timestamp=None, upper_disp_limit=None,
lower_disp_limit=None, upper_alarm_limit=None,
upper_warning_limit=None,
lower_warning_limit=None, lower_alarm_limit=None,
upper_ctrl_limit=None, lower_ctrl_limit=None,
status=None, severity=None):
"""
Write metadata, optionally publishing information to clients.
Parameters
----------
publish : bool, optional
Publish the metadata update to clients.
units : str, optional
Updated units.
precision : int, optional
Updated precision value.
timestamp : float, optional
Updated timestamp.
upper_disp_limit : int or float, optional
Updated upper display limit.
lower_disp_limit : int or float, optional
Updated lower display limit.
upper_alarm_limit : int or float, optional
Updated upper alarm limit.
upper_warning_limit : int or float, optional
Updated upper warning limit.
lower_warning_limit : int or float, optional
Updated lower warning limit.
lower_alarm_limit : int or float, optional
Updated lower alarm limit.
upper_ctrl_limit : int or float, optional
Updated upper control limit.
lower_ctrl_limit : int or float, optional
Updated lower control limit.
status : AlarmStatus, optional
Updated alarm status.
severity : AlarmSeverity, optional
Updated alarm severity.
"""
data = self._data
for kw in ('units', 'precision', 'timestamp', 'upper_disp_limit',
'lower_disp_limit', 'upper_alarm_limit',
'upper_warning_limit', 'lower_warning_limit',
'lower_alarm_limit', 'upper_ctrl_limit',
'lower_ctrl_limit'):
value = locals()[kw]
if value is not None and kw in data:
# Unpack scalars. This could be skipped for numpy.ndarray which
# does the right thing, but is essential for array.array to
# work.
try:
value, = value
except (TypeError, ValueError):
pass
data[kw] = value
if any(alarm_val is not None
for alarm_val in (status, severity)):
await self.alarm.write(status=status, severity=severity,
publish=publish)
if publish:
await self.publish(SubscriptionType.DBE_PROPERTY)
@property
def epics_timestamp(self):
'EPICS timestamp as (seconds, nanoseconds) since EPICS epoch'
return timestamp_to_epics(self._data['timestamp'])
@property
def status(self):
'''Alarm status'''
return (self.alarm.status
if self._status is None
else self._status)
@status.setter
def status(self, value):
self._status = AlarmStatus(value)
@property
def severity(self):
'''Alarm severity'''
return (self.alarm.severity
if self._severity is None
else self._severity)
@severity.setter
def severity(self, value):
self._severity = AlarmSeverity(value)
def _collect_alarm(self):
out = {}
if self._status is not None and self._status != self.alarm.status:
out['status'] = self._status
if self._severity is not None and self._status != self.alarm.status:
out['severity'] = self._severity
self._clear_cached_alarms()
return out
def _clear_cached_alarms(self):
self._status = self._severity = None
def __len__(self):
try:
return len(self.value)
except TypeError:
return 1
def check_access(self, hostname, username):
"""
This always returns ``AccessRights.READ|AccessRights.WRITE``.
Subclasses can override to implement access logic using hostname,
username and returning one of:
(``AccessRights.NO_ACCESS``,
``AccessRights.READ``,
``AccessRights.WRITE``,
``AccessRights.READ|AccessRights.WRITE``).
Parameters
----------
hostname : string
username : string
Returns
-------
access : :data:`AccessRights.READ|AccessRights.WRITE`
"""
return AccessRights.READ | AccessRights.WRITE
def is_compatible_array(self, value) -> bool:
"""
Check if the provided value is a compatible array.
This requires that ``value`` follow the "array interface", as defined by
`numpy <https://numpy.org/doc/stable/reference/arrays.interface.html>`_.
Parameters
----------
value : any
The value to check.
Returns
-------
bool
True if ``value`` is compatible, False otherwise.
"""
interface = getattr(value, "__array_interface__", None)
if interface is None:
return False
dimensions = len(interface['shape'])
return (
# Ensure it's 1 dimensional:
dimensions == 1 and
# Not strided - which 1D data shouldn't be anyway...
interface['strides'] is None and
# And a compatible array type, defined in the class body:
interface['typestr'] in self._compatible_array_types
)
[docs]class ChannelEnum(ChannelData):
"""
ENUM data which can be sent over a channel.
Arrays of ENUM data are not supported.
Parameters
----------
value : int or str
The string value in the enum, or an integer index of that list.
enum_strings : list, tuple, optional
Enum strings to be used for the data.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
"""
data_type = ChannelType.ENUM
@staticmethod
def _validate_enum_strings(enum_strings):
if any(len(es) >= MAX_ENUM_STRING_SIZE for es in enum_strings):
over_length = tuple(f'{es}: {len(es)}' for es in enum_strings if
len(es) >= MAX_ENUM_STRING_SIZE)
msg = (f"The maximum enum string length is {MAX_ENUM_STRING_SIZE} " +
f"but the strings {over_length} are too long")
raise ValueError(msg)
if len(enum_strings) > MAX_ENUM_STATES:
raise ValueError(f"The maximum number of enum states is {MAX_ENUM_STATES} " +
f"but you passed in {len(enum_strings)}")
return tuple(enum_strings)
def __init__(self, *, enum_strings=None, **kwargs):
super().__init__(**kwargs)
if enum_strings is None:
enum_strings = tuple()
self._data['enum_strings'] = self._validate_enum_strings(enum_strings)
enum_strings = _read_only_property('enum_strings')
def get_raw_value(self, value) -> int:
"""The raw integer value index of the provided enum string."""
try:
return self.enum_strings.index(value)
except ValueError:
return None
@property
def raw_value(self) -> int:
"""The raw integer value index of the enum string."""
return self.get_raw_value(self.value)
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs['enum_strings'] = self.enum_strings
return (args, kwargs)
async def verify_value(self, data):
try:
return self.enum_strings[data]
except (IndexError, TypeError):
...
return data
def _read_metadata(self, dbr_metadata):
if isinstance(dbr_metadata, (DBR_TYPES[ChannelType.GR_ENUM],
DBR_TYPES[ChannelType.CTRL_ENUM])):
dbr_metadata.enum_strings = [s.encode(self.string_encoding)
for s in self.enum_strings]
return super()._read_metadata(dbr_metadata)
async def write(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write(*args, flags=flags, **kwargs)
async def write_from_dbr(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write_from_dbr(*args, flags=flags, **kwargs)
async def write_metadata(self, enum_strings=None, **kwargs):
if enum_strings is not None:
self._data['enum_strings'] = self._validate_enum_strings(enum_strings)
return await super().write_metadata(**kwargs)
class ChannelNumeric(ChannelData):
"""
Base class for numeric data types with limits.
May be a single value or an array of values.
Parameters
----------
value :
Data which has to match with this class's ``data_type``.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : numeric, optional
Upper display limit.
lower_disp_limit : numeric, optional
Lower display limit.
upper_alarm_limit : numeric, optional
Upper alarm limit.
upper_warning_limit : numeric, optional
Upper warning limit.
lower_warning_limit : numeric, optional
Lower warning limit.
lower_alarm_limit : numeric, optional
Lower alarm limit.
upper_ctrl_limit : numeric, optional
Upper control limit.
lower_ctrl_limit : numeric, optional
Lower control limit.
value_atol : numeric, optional
Archive tolerance value.
log_atol : numeric, optional
Log tolerance value.
"""
def __init__(self, *, value, units='',
upper_disp_limit=0, lower_disp_limit=0,
upper_alarm_limit=0, upper_warning_limit=0,
lower_warning_limit=0, lower_alarm_limit=0,
upper_ctrl_limit=0, lower_ctrl_limit=0,
value_atol=0.0, log_atol=0.0,
**kwargs):
super().__init__(value=value, **kwargs)
self._data['units'] = units
self._data['upper_disp_limit'] = upper_disp_limit
self._data['lower_disp_limit'] = lower_disp_limit
self._data['upper_alarm_limit'] = upper_alarm_limit
self._data['upper_warning_limit'] = upper_warning_limit
self._data['lower_warning_limit'] = lower_warning_limit
self._data['lower_alarm_limit'] = lower_alarm_limit
self._data['upper_ctrl_limit'] = upper_ctrl_limit
self._data['lower_ctrl_limit'] = lower_ctrl_limit
self.value_atol = value_atol
self.log_atol = log_atol
units = _read_only_property('units')
upper_disp_limit = _read_only_property('upper_disp_limit')
lower_disp_limit = _read_only_property('lower_disp_limit')
upper_alarm_limit = _read_only_property('upper_alarm_limit')
lower_alarm_limit = _read_only_property('lower_alarm_limit')
upper_warning_limit = _read_only_property('upper_warning_limit')
lower_warning_limit = _read_only_property('lower_warning_limit')
upper_ctrl_limit = _read_only_property('upper_ctrl_limit')
lower_ctrl_limit = _read_only_property('lower_ctrl_limit')
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs.update(
units=self.units,
upper_disp_limit=self.upper_disp_limit,
lower_disp_limit=self.lower_disp_limit,
upper_alarm_limit=self.upper_alarm_limit,
lower_alarm_limit=self.lower_alarm_limit,
upper_warning_limit=self.upper_warning_limit,
lower_warning_limit=self.lower_warning_limit,
upper_ctrl_limit=self.upper_ctrl_limit,
lower_ctrl_limit=self.lower_ctrl_limit,
)
return (args, kwargs)
async def verify_value(self, data):
if not isinstance(data, Iterable):
val = data
elif len(data) == 1:
val, = data
else:
# data is an array -- limits do not apply.
return data
if self.lower_ctrl_limit != self.upper_ctrl_limit:
if not self.lower_ctrl_limit <= val <= self.upper_ctrl_limit:
raise CannotExceedLimits(
f"Cannot write data {val}. Limits are set to "
f"{self.lower_ctrl_limit} and {self.upper_ctrl_limit}."
)
def limit_checker(
value,
lo_attr, hi_attr,
lo_status, hi_status,
lo_severity_attr,
hi_severity_attr,
dflt_lo_severity,
dflt_hi_severity):
def limit_getter(limit_attr, severity_attr, dflt_severity):
sev = dflt_severity
limit = getattr(self, limit_attr)
sev_prop = getattr(
getattr(self, 'field_inst', None),
severity_attr, None)
if sev_prop is not None:
# TODO sort out where ints are getting through...
if isinstance(sev_prop.value, str):
sev = sev_prop.enum_strings.index(sev_prop.value)
return limit, AlarmSeverity(sev)
lo_limit, lo_severity = limit_getter(
lo_attr, lo_severity_attr, dflt_lo_severity)
hi_limit, hi_severity = limit_getter(
hi_attr, hi_severity_attr, dflt_hi_severity)
if lo_limit != hi_limit:
if value <= lo_limit:
return lo_status, lo_severity
elif hi_limit <= value:
return hi_status, hi_severity
return AlarmStatus.NO_ALARM, AlarmSeverity.NO_ALARM
# this is HIHI and LOLO limits
asts, asver = limit_checker(val,
'lower_alarm_limit',
'upper_alarm_limit',
AlarmStatus.LOLO,
AlarmStatus.HIHI,
'lolo_severity',
'hihi_severity',
AlarmSeverity.MAJOR_ALARM,
AlarmSeverity.MAJOR_ALARM)
# if HIHI and LOLO did not trigger as alarm, see if HIGH and LOW do
if asts is AlarmStatus.NO_ALARM:
# this is HIGH and LOW limits
asts, asver = limit_checker(val,
'lower_warning_limit',
'upper_warning_limit',
AlarmStatus.LOW,
AlarmStatus.HIGH,
'low_severity',
'high_severity',
AlarmSeverity.MINOR_ALARM,
AlarmSeverity.MINOR_ALARM)
self.status = asts
self.severity = asver
return data
[docs]class ChannelShort(ChannelNumeric):
"""
16-bit SHORT integer data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : int or list of int
Default starting value.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : int, optional
Upper display limit.
lower_disp_limit : int, optional
Lower display limit.
upper_alarm_limit : int, optional
Upper alarm limit.
upper_warning_limit : int, optional
Upper warning limit.
lower_warning_limit : int, optional
Lower warning limit.
lower_alarm_limit : int, optional
Lower alarm limit.
upper_ctrl_limit : int, optional
Upper control limit.
lower_ctrl_limit : int, optional
Lower control limit.
value_atol : int, optional
Archive tolerance value.
log_atol : int, optional
Log tolerance value.
"""
data_type = ChannelType.INT
[docs]class ChannelInteger(ChannelNumeric):
"""
32-bit LONG integer data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : int or list of int
Default starting value.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : int, optional
Upper display limit.
lower_disp_limit : int, optional
Lower display limit.
upper_alarm_limit : int, optional
Upper alarm limit.
upper_warning_limit : int, optional
Upper warning limit.
lower_warning_limit : int, optional
Lower warning limit.
lower_alarm_limit : int, optional
Lower alarm limit.
upper_ctrl_limit : int, optional
Upper control limit.
lower_ctrl_limit : int, optional
Lower control limit.
value_atol : int, optional
Archive tolerance value.
log_atol : int, optional
Log tolerance value.
"""
data_type = ChannelType.LONG
[docs]class ChannelFloat(ChannelNumeric):
"""
32-bit floating point data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : float or list of float
Initial value for the data.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual record or
be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : float, optional
Upper display limit.
lower_disp_limit : float, optional
Lower display limit.
upper_alarm_limit : float, optional
Upper alarm limit.
upper_warning_limit : float, optional
Upper warning limit.
lower_warning_limit : float, optional
Lower warning limit.
lower_alarm_limit : float, optional
Lower alarm limit.
upper_ctrl_limit : float, optional
Upper control limit.
lower_ctrl_limit : float, optional
Lower control limit.
value_atol : float, optional
Archive tolerance value.
log_atol : float, optional
Log tolerance value.
"""
data_type = ChannelType.FLOAT
def __init__(self, *, precision=0, **kwargs):
super().__init__(**kwargs)
self._data['precision'] = precision
precision = _read_only_property('precision')
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs['precision'] = self.precision
return (args, kwargs)
[docs]class ChannelDouble(ChannelNumeric):
"""
64-bit floating point data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : float or list of float
Initial value for the data.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : float, optional
Upper display limit.
lower_disp_limit : float, optional
Lower display limit.
upper_alarm_limit : float, optional
Upper alarm limit.
upper_warning_limit : float, optional
Upper warning limit.
lower_warning_limit : float, optional
Lower warning limit.
lower_alarm_limit : float, optional
Lower alarm limit.
upper_ctrl_limit : float, optional
Upper control limit.
lower_ctrl_limit : float, optional
Lower control limit.
value_atol : float, optional
Archive tolerance value.
log_atol : float, optional
Log tolerance value.
"""
data_type = ChannelType.DOUBLE
def __init__(self, *, precision=0, **kwargs):
super().__init__(**kwargs)
self._data['precision'] = precision
precision = _read_only_property('precision')
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs['precision'] = self.precision
return (args, kwargs)
[docs]class ChannelByte(ChannelNumeric):
"""
8-bit unencoded CHAR data plus metadata which can be sent over a channel.
May be a single CHAR or an array of CHAR.
Parameters
----------
value : int or bytes
Initial starting data.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : int, optional
Upper display limit.
lower_disp_limit : int, optional
Lower display limit.
upper_alarm_limit : int, optional
Upper alarm limit.
upper_warning_limit : int, optional
Upper warning limit.
lower_warning_limit : int, optional
Lower warning limit.
lower_alarm_limit : int, optional
Lower alarm limit.
upper_ctrl_limit : int, optional
Upper control limit.
lower_ctrl_limit : int, optional
Lower control limit.
value_atol : int, optional
Archive tolerance value.
log_atol : int, optional
Log tolerance value.
"""
# 'Limits' on chars do not make much sense and are rarely used.
data_type = ChannelType.CHAR
_compatible_array_types = {'|u1', '|i1', '|b1'}
def __init__(self, *, string_encoding=None, strip_null_terminator=True,
**kwargs):
if string_encoding is not None:
raise CaprotoValueError('ChannelByte cannot have a string encoding')
self.strip_null_terminator = strip_null_terminator
super().__init__(string_encoding=None, **kwargs)
def preprocess_value(self, value):
value = super().preprocess_value(value)
if self.is_compatible_array(value):
if not is_array_read_only(value):
value = copy.copy(value)
if self.max_length == 1:
return value[0]
return value
if isinstance(value, (list, tuple) + backend.array_types):
if not len(value):
return b''
elif len(value) == 1:
value = value[0]
else:
value = b''.join(map(bytes, ([v] for v in value)))
if self.max_length == 1:
try:
len(value)
except TypeError:
# Allow a scalar byte value to be passed in
value = bytes([value])
if isinstance(value, str):
raise CaprotoValueError('ChannelByte does not accept decoded strings')
if self.strip_null_terminator:
if not isinstance(value, bytes):
value = value.tobytes()
value = value.rstrip(b'\x00')
return value
[docs]class ChannelChar(ChannelData):
"""
8-bit encoded CHAR data plus metadata which can be sent over a channel.
Allows for simple access over CA to the first 40 characters as a DBR_STRING
when ``report_as_string`` is set.
Parameters
----------
value : str
Initial starting data.
string_encoding : str, optional
The string encoding to use, defaults to 'latin-1'.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
"""
data_type = ChannelType.CHAR
_compatible_array_types = {'|u1', '|i1', '|b1'}
def __init__(self, *, alarm=None, value=None, timestamp=None,
max_length=None, string_encoding='latin-1',
reported_record_type='caproto', report_as_string=False):
super().__init__(alarm=alarm, value=value, timestamp=timestamp,
max_length=max_length,
string_encoding=string_encoding,
reported_record_type=reported_record_type)
if report_as_string:
self.data_type = ChannelType.STRING
@property
def long_string_max_length(self):
'The maximum number of elements (length) of the current value'
return super().max_length
@property
def max_length(self):
'The number of elements (length) of the current value'
if self.data_type == ChannelType.STRING:
return 1
return super().max_length
def preprocess_value(self, value):
value = super().preprocess_value(value)
if self.is_compatible_array(value):
value = value.tobytes()
elif isinstance(value, (list, tuple) + backend.array_types):
if not len(value):
value = b''
elif len(value) == 1:
value = value[0]
else:
value = b''.join(map(bytes, ([v] for v in value)))
if self.max_length == 1:
try:
len(value)
except TypeError:
# Allow a scalar byte value to be passed in
value = str(bytes([value]), self.string_encoding)
if isinstance(value, bytes):
value = value.decode(self.string_encoding)
if not isinstance(value, str):
raise CaprotoValueError('Invalid string')
return value
async def write(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write(*args, flags=flags, **kwargs)
async def write_from_dbr(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write_from_dbr(*args, flags=flags, **kwargs)
[docs]class ChannelString(ChannelData):
"""
8-bit encoded string data plus metadata which can be sent over a channel.
Allows for simple access over CA to the first 40 characters as a DBR_STRING
when ``report_as_string`` is set.
Parameters
----------
value : str
Initial starting data.
string_encoding : str, optional
The string encoding to use, defaults to 'latin-1'.
long_string_max_length : str, optional
When requested as a long string (DBR_CHAR over Channel Access), this
is the reported maximum length.
timestamp : float, optional
Posix timestamp associated with the value. Defaults to ``time.time()``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
"""
data_type = ChannelType.STRING
def __init__(self, *, alarm=None, value=None, timestamp=None,
max_length=None, string_encoding='latin-1',
reported_record_type='caproto', long_string_max_length=81):
super().__init__(alarm=alarm, value=value, timestamp=timestamp,
max_length=max_length,
string_encoding=string_encoding,
reported_record_type=reported_record_type)
self._long_string_max_length = long_string_max_length
@property
def long_string_max_length(self):
'The maximum number of elements (length) of the current value'
return self._long_string_max_length
async def write(self, value, *, flags=0, **metadata):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write(value, flags=flags, **metadata)
async def write_from_dbr(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write_from_dbr(*args, flags=flags, **kwargs)