# These classes ChannelData classes hold the state associated with the data
# source underlying a Channel, including data values, alarm state, and
# metadata. They perform data type conversions in response to requests to read
# data as a certain type, and they push updates into queues registered by a
# higher-level server.
import copy
import time
import weakref
from collections import defaultdict, namedtuple
from collections.abc import Iterable
from typing import Any
from ._backend import backend
from ._commands import parse_metadata
from ._constants import MAX_ENUM_STATES, MAX_ENUM_STRING_SIZE
from ._dbr import (DBR_STSACK_STRING, DBR_TYPES, AccessRights, AlarmSeverity,
AlarmStatus, ChannelType, GraphicControlBase,
SubscriptionType, TimeStamp, _channel_type_by_name,
_LongStringChannelType, native_type, native_types,
time_types)
from ._utils import (CaprotoError, CaprotoValueError, ConversionDirection,
is_array_read_only)
__all__ = ('Forbidden',
'ChannelAlarm',
'ChannelByte',
'ChannelChar',
'ChannelData',
'ChannelDouble',
'ChannelEnum',
'ChannelFloat',
'ChannelInteger',
'ChannelNumeric',
'ChannelShort',
'ChannelString',
'SkipWrite',
)
SubscriptionUpdate = namedtuple('SubscriptionUpdate',
('sub_specs', 'metadata', 'values',
'flags', 'sub'))
class Forbidden(CaprotoError):
...
class CannotExceedLimits(CaprotoValueError):
...
class SkipWrite(Exception):
"""Raise this exception to skip further processing of write()."""
def dbr_metadata_to_dict(dbr_metadata, string_encoding):
'''Return a dictionary of metadata keys to values'''
info = dbr_metadata.to_dict()
try:
info['units'] = info['units'].decode(string_encoding)
except KeyError:
...
return info
def _read_only_property(key, doc=None):
'''Create property that gives read-only access to instance._data[key]'''
if doc is None:
doc = f"Read-only access to {key} data"
return property(lambda self: self._data[key], doc=doc)
class ChannelAlarm:
def __init__(self, *, status=0, severity=0,
must_acknowledge_transient=True, severity_to_acknowledge=0,
alarm_string='', string_encoding='latin-1'):
"""
Alarm metadata that can be used by one or more ChannelData instances.
Parameters
----------
status : int or AlarmStatus, optional
Status information.
severity : int or AlarmSeverity, optional
Severity information.
must_acknowledge_transient : int, optional
Whether or not transient alarms must be acknowledged.
severity_to_acknowledge : int, optional
The highest alarm severity to acknowledge. If the current alarm
severity is less then or equal to this value the alarm is
acknowledged.
alarm_string : str, optional
Alarm information.
string_encoding : str, optional
String encoding of the alarm string.
"""
self._channels = weakref.WeakSet()
self.string_encoding = string_encoding
self._data = dict(
status=status, severity=severity,
must_acknowledge_transient=must_acknowledge_transient,
severity_to_acknowledge=severity_to_acknowledge,
alarm_string=alarm_string)
def __getnewargs_ex__(self):
kwargs = {
'status': self.status,
'severity': self.severity,
'must_acknowledge_transient': self.must_acknowledge_transient,
'severity_to_acknowledge': self.severity_to_acknowledge,
'alarm_string': self.alarm_string,
'string_encoding': self.string_encoding,
}
return ((), kwargs)
status = _read_only_property('status',
doc='Current alarm status')
severity = _read_only_property('severity',
doc='Current alarm severity')
must_acknowledge_transient = _read_only_property(
'must_acknowledge_transient',
doc='Toggle whether or not transient alarms must be acknowledged')
severity_to_acknowledge = _read_only_property(
'severity_to_acknowledge',
doc='The alarm severity that has been acknowledged')
alarm_string = _read_only_property('alarm_string',
doc='String associated with alarm')
def __repr__(self):
return f'<ChannelAlarm(status={self.status}, severity={self.severity})>'
def __eq__(self, other: "ChannelAlarm") -> bool:
if not isinstance(other, ChannelAlarm):
return False
return (
self.status == other.status and
self.severity == other.severity and
self.must_acknowledge_transient == other.must_acknowledge_transient and
self.severity_to_acknowledge == other.severity_to_acknowledge and
self.alarm_string == other.alarm_string
)
def connect(self, channel_data):
"""Add a ChannelData instance to the channel set using this alarm."""
self._channels.add(channel_data)
def disconnect(self, channel_data):
"""Remove ChannelData instance from channel set using this alarm."""
self._channels.remove(channel_data)
async def read(self, dbr=None):
"""Read alarm information into a DBR_STSACK_STRING instance."""
if dbr is None:
dbr = DBR_STSACK_STRING()
dbr.status = self.status
dbr.severity = self.severity
dbr.ackt = 1 if self.must_acknowledge_transient else 0
dbr.acks = self.severity_to_acknowledge
dbr.value = self.alarm_string.encode(self.string_encoding)
return dbr
async def write(self, *, status=None, severity=None,
must_acknowledge_transient=None,
severity_to_acknowledge=None,
alarm_string=None, flags=0, publish=True):
"""
Write data to the alarm and optionally publish it to clients.
Parameters
----------
status : int or AlarmStatus, optional
Status information.
severity : int or AlarmSeverity, optional
Severity information.
must_acknowledge_transient : int, optional
Whether or not transient alarms must be acknowledged.
severity_to_acknowledge : int, optional
The highest alarm severity to acknowledge. If the current alarm
severity is less then or equal to this value the alarm is
acknowledged.
alarm_string : str, optional
Alarm information.
flags : SubscriptionType or int, optional
Subscription flags.
publish : bool, optional
Optionally publish the alarm status after the write.
"""
data = self._data
if status is not None:
data['status'] = AlarmStatus(status)
flags |= SubscriptionType.DBE_VALUE
if severity is not None:
data['severity'] = AlarmSeverity(severity)
if (not self.must_acknowledge_transient or
self.severity_to_acknowledge < self.severity):
data['severity_to_acknowledge'] = self.severity
flags |= SubscriptionType.DBE_ALARM
if must_acknowledge_transient is not None:
data['must_acknowledge_transient'] = must_acknowledge_transient
if (not must_acknowledge_transient and
self.severity_to_acknowledge > self.severity):
# Reset the severity to acknowledge if disabling transient
# requirement
data['severity_to_acknowledge'] = self.severity
flags |= SubscriptionType.DBE_ALARM
if severity_to_acknowledge is not None:
# To clear, set greater than or equal to the
# severity_to_acknowledge
if severity_to_acknowledge >= self.severity:
data['severity_to_acknowledge'] = 0
flags |= SubscriptionType.DBE_ALARM
if alarm_string is not None:
data['alarm_string'] = alarm_string
flags |= SubscriptionType.DBE_ALARM
if publish:
await self.publish(flags)
async def publish(self, flags, *, except_for=None):
"""
Publish alarm information to all listening channels.
Parameters
----------
flags : SubscriptionType
The subscription type to publish.
except_for : sequence of ChannelData, optional
Skip publishing to these channels, mainly to avoid recursion.
"""
except_for = except_for or ()
for channel in self._channels:
if channel not in except_for:
await channel.publish(flags)
[docs]class ChannelData:
"""
Base class holding data and metadata which can be sent across a Channel.
Parameters
----------
value :
Data which has to match with this class's ``data_type``.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp associated with the value. Defaults to ``time.time()``.
Raw EPICS timestamps are also supported in the form of
``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
"""
data_type = ChannelType.LONG
default_value: Any = 0
_compatible_array_types = {}
def __init__(self, *, alarm=None, value=None, timestamp=None,
max_length=None, string_encoding='latin-1',
reported_record_type='caproto'):
if timestamp is None:
timestamp = time.time()
if alarm is None:
alarm = ChannelAlarm()
self._alarm = None
self._status = None
self._severity = None
# now use the setter to attach the alarm correctly:
self.alarm = alarm
self._max_length = max_length
self.string_encoding = string_encoding
self.reported_record_type = reported_record_type
if self._max_length is None:
# Use the current length as maximum, if unspecified.
self._max_length = max(self.calculate_length(value), 1)
# It is possible to pass in a zero-length array to start with.
# However, it is not useful to have an empty value forever, so the
# minimum length here is required to be at least 1.
value = self.preprocess_value(value)
# The following _data isn't meant to be modified directly. It should be
# modified by way of the ``write()`` and ``write_metadata`` methods.
self._data = dict(
value=value,
timestamp=TimeStamp.from_flexible_value(timestamp),
)
# This is a dict keyed on queues that will receive subscription
# updates. (Each queue belongs to a Context.) Each value is itself a
# dict, mapping data_types to the set of SubscriptionSpecs that request
# that data_type.
self._queues = defaultdict(
lambda: defaultdict(
lambda: defaultdict(set)))
# Cache results of data_type conversions. This maps data_type to
# (metdata, value). This is cleared each time publish() is called.
self._content = {}
self._snapshots = defaultdict(dict)
self._fill_at_next_write = list()
def __getstate__(self):
state = dict(self.__dict__)
state.pop("_queues", None)
state.pop("_snapshots", None)
return state
def calculate_length(self, value):
'Calculate the number of elements given a value'
is_array = isinstance(value, (list, tuple) + backend.array_types)
if is_array:
return len(value)
if isinstance(value, (bytes, str, bytearray)):
if self.data_type == ChannelType.CHAR:
return len(value)
return 1
@property
def length(self):
'The number of elements (length) of the current value'
return self.calculate_length(self.value)
@property
def max_length(self):
'The maximum number of elements (length) this channel can hold'
return self._max_length
def preprocess_value(self, value):
'''Pre-process values destined for verify_value and write
A few basic things are done here, based on max_count:
1. If length >= 2, ensure the value is a list
2. If length == 1, ensure the value is an unpacked scalar
3. Ensure len(value) <= length
Raises
------
CaprotoValueError
'''
is_array = isinstance(value, (list, tuple) + backend.array_types)
if is_array:
if len(value) > self._max_length:
# TODO consider an exception for caproto-only environments that
# can handle dynamically resized arrays (i.e., sizes greater
# than the initial max_length)?
raise CaprotoValueError(
f'Value of length {len(value)} is too large for '
f'{self.__class__.__name__}(max_length={self._max_length})'
)
if self._max_length == 1:
if is_array:
if len(value):
# scalar value in a list -> scalar value
return value[0]
raise CaprotoValueError(
'Cannot set a scalar to an empty array')
elif not is_array:
# scalar value that should be in a list -> list
return [value]
return value
def __getnewargs_ex__(self):
# ref: https://docs.python.org/3/library/pickle.html
kwargs = {
'timestamp': self.timestamp,
'alarm': self.alarm,
'string_encoding': self.string_encoding,
'reported_record_type': self.reported_record_type,
'max_length': self._max_length,
**copy.deepcopy(self._data)
}
return ((), kwargs)
value = _read_only_property('value')
# "before" — only the last value received before the state changes from
# false to true is forwarded to the client.
# "first" — only the first value received after the state changes from true
# to false is forwarded to the client.
# "while" — values are forwarded to the client as long as the state is true.
# "last" — only the last value received before the state changes from true
# to false is forwarded to the client.
# "after" — only the first value received after the state changes from true
# to false is forwarded to the client.
# "unless" — values are forwarded to the client as long as the state is
# false.
def pre_state_change(self, state, new_value):
"This is called by the server when it enters its StateUpdateContext."
snapshots = self._snapshots[state]
snapshots.clear()
if new_value:
# We are changing from false to true.
snapshots['before'] = copy.deepcopy(self)
else:
# We are changing from true to false.
snapshots['last'] = copy.deepcopy(self)
def post_state_change(self, state, new_value):
"This is called by the server when it exits its StateUpdateContext."
snapshots = self._snapshots[state]
if new_value:
# We have changed from false to true.
snapshots['while'] = self
self._fill_at_next_write.append((state, 'after'))
else:
# We have changed from true to false.
snapshots['unless'] = self
self._fill_at_next_write.append((state, 'first'))
@property
def alarm(self):
'The ChannelAlarm associated with this data'
return self._alarm
@alarm.setter
def alarm(self, alarm):
old_alarm = self._alarm
if old_alarm is alarm:
return
if old_alarm is not None:
old_alarm.disconnect(self)
self._alarm = alarm
if alarm is not None:
alarm.connect(self)
async def subscribe(self, queue, sub_spec, sub):
"""
Subscribe a queue for the given subscription specification.
Parameters
----------
queue : asyncio.Queue or compatible
The queue to send data to.
sub_spec : SubscriptionSpec
The matching subscription specification.
sub : Subscription
The subscription instance.
"""
by_sync = self._queues[queue][sub_spec.channel_filter.sync]
by_sync[sub_spec.data_type_name].add(sub_spec)
# Always send current reading immediately upon subscription.
try:
metadata, values = self._content[sub_spec.data_type_name]
except KeyError:
# Do the expensive data type conversion and cache it in case
# a future subscription wants the same data type.
data_type = _channel_type_by_name[sub_spec.data_type_name]
metadata, values = await self._read(data_type)
self._content[sub_spec.data_type_name] = metadata, values
await queue.put(SubscriptionUpdate((sub_spec,), metadata, values, 0, sub))
async def unsubscribe(self, queue, sub_spec):
"""
Unsubscribe a queue for the given subscription specification.
Parameters
----------
queue : asyncio.Queue or compatible
The queue to send data to.
sub_spec : SubscriptionSpec
The subscription specification.
"""
by_sync = self._queues[queue][sub_spec.channel_filter.sync]
by_sync[sub_spec.data_type_name].discard(sub_spec)
async def auth_read(self, hostname, username, data_type, *,
user_address=None):
'''Get DBR data and native data, converted to a specific type'''
access = self.check_access(hostname, username)
if AccessRights.READ not in access:
raise Forbidden("Client with hostname {} and username {} cannot "
"read.".format(hostname, username))
return (await self.read(data_type))
async def read(self, data_type):
"""
Read out the ChannelData as ``data_type``.
Parameters
----------
data_type : ChannelType
The data type to read out.
"""
# Subclass might trigger a write here to update self._data before
# reading it out.
return (await self._read(data_type))
async def _read(self, data_type):
"""
Inner method to read out the ChannelData as ``data_type``.
Parameters
----------
data_type : ChannelType
The data type to read out.
"""
# special cases for alarm strings and class name
if data_type == ChannelType.STSACK_STRING:
ret = await self.alarm.read()
return (ret, b'')
elif data_type == ChannelType.CLASS_NAME:
class_name = DBR_TYPES[data_type]()
rtyp = self.reported_record_type.encode(self.string_encoding)
class_name.value = rtyp
return class_name, b''
if data_type in _LongStringChannelType:
native_to = _LongStringChannelType.LONG_STRING
data_type = ChannelType(data_type)
else:
native_to = native_type(data_type)
values = backend.convert_values(
values=self._data['value'],
from_dtype=self.data_type,
to_dtype=native_to,
string_encoding=self.string_encoding,
enum_strings=self._data.get('enum_strings'),
direction=ConversionDirection.TO_WIRE,
)
# for native types, there is no dbr metadata - just data
if data_type in native_types:
return b'', values
dbr_metadata = DBR_TYPES[data_type]()
self._read_metadata(dbr_metadata)
# Copy alarm fields also.
alarm_dbr = await self.alarm.read()
for field, _ in alarm_dbr._fields_:
if hasattr(dbr_metadata, field):
setattr(dbr_metadata, field, getattr(alarm_dbr, field))
return dbr_metadata, values
async def auth_write(self, hostname, username, data, data_type, metadata,
*, flags=0, user_address=None):
"""
Data write hook for clients.
First verifies that the specified client may write to the instance
prior to proceeding.
Parameters
----------
hostname : str
The hostname of the client.
username : str
The username of the client.
data :
The data to write.
data_type : ChannelType
The data type associated with the written data.
metadata :
Metadata instance.
flags : int, optional
Flags for publishing.
user_address : tuple, optional
(host, port) tuple of the user.
Raises
------
Forbidden:
If client is not allowed to write to this instance.
"""
access = self.check_access(hostname, username)
if AccessRights.WRITE not in access:
raise Forbidden("Client with hostname {} and username {} cannot "
"write.".format(hostname, username))
return (await self.write_from_dbr(data, data_type, metadata,
flags=flags))
async def verify_value(self, data):
"""
Verify a value prior to it being written by CA or Python
To reject a value, raise an exception. Otherwise, return the
original value or a modified version of it.
"""
return data
async def write_from_dbr(self, data, data_type, metadata, *, flags=0):
"""
Write data from a provided DBR data type.
Does not verify the client has authorization to write.
Parameters
----------
data :
The data to write.
data_type : ChannelType
The data type associated with the written data.
metadata :
Metadata instance.
flags : int, optional
Flags for publishing.
Raises
------
CaprotoValueError:
If the request is not valid.
"""
if data_type == ChannelType.PUT_ACKS:
await self.alarm.write(severity_to_acknowledge=metadata.value)
return
elif data_type == ChannelType.PUT_ACKT:
await self.alarm.write(must_acknowledge_transient=metadata.value)
return
elif data_type in (ChannelType.STSACK_STRING, ChannelType.CLASS_NAME):
raise CaprotoValueError('Bad request')
timestamp = time.time()
native_from = native_type(data_type)
value = backend.convert_values(
values=data, from_dtype=native_from,
to_dtype=self.data_type,
string_encoding=self.string_encoding,
enum_strings=getattr(self, 'enum_strings', None),
direction=ConversionDirection.FROM_WIRE)
if metadata is None:
metadata_dict = {}
else:
# Convert `metadata` to bytes-like (or pass it through).
md_payload = parse_metadata(metadata, data_type)
# Depending on the type of `metadata` above,
# `md_payload` could be a DBR struct or plain bytes.
# Load it into a struct (zero-copy) to be sure.
dbr_metadata = DBR_TYPES[data_type].from_buffer(md_payload)
metadata_dict = dbr_metadata_to_dict(dbr_metadata,
self.string_encoding)
metadata_dict.setdefault('timestamp', timestamp)
return (await self.write(value, flags=flags, **metadata_dict))
async def write(self, value, *, flags=0, verify_value=True,
update_fields=True, **metadata):
"""
Write data from native Python types.
Metadata may be updated at the same time by way of keyword arguments.
Refer to the parameters section below for the keywords or the method
``write_metadata``.
Parameters
----------
value :
The native Python data.
flags : SubscriptionType or int, optional
The flags for subscribers.
verify_value : bool, optional
Run the ``verify_value`` hook prior to updating internal state.
update_fields : bool, optional
Run the ``update_fields`` hook prior to updating internal state.
units : str, optional
[Metadata] Updated units.
precision : int, optional
[Metadata] Updated precision value.
timestamp : float, TimeStamp, or 2-tuple, optional
[Metadata] Updated timestamp. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the
form of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
upper_disp_limit : int or float, optional
[Metadata] Updated upper display limit.
lower_disp_limit : int or float, optional
[Metadata] Updated lower display limit.
upper_alarm_limit : int or float, optional
[Metadata] Updated upper alarm limit.
upper_warning_limit : int or float, optional
[Metadata] Updated upper warning limit.
lower_warning_limit : int or float, optional
[Metadata] Updated lower warning limit.
lower_alarm_limit : int or float, optional
[Metadata] Updated lower alarm limit.
upper_ctrl_limit : int or float, optional
[Metadata] Updated upper control limit.
lower_ctrl_limit : int or float, optional
[Metadata] Updated lower control limit.
status : AlarmStatus, optional
[Metadata] Updated alarm status.
severity : AlarmSeverity, optional
[Metadata] Updated alarm severity.
Raises
------
Exception:
Any exception raised in the handlers (except SkipWrite) will be
propagated to the caller.
"""
try:
value = self.preprocess_value(value)
if verify_value:
modified_value = await self.verify_value(value)
else:
modified_value = None
if update_fields:
await self.update_fields(
modified_value if verify_value else value
)
except SkipWrite:
# Handler raised SkipWrite to avoid the rest of this method.
return
except GeneratorExit:
raise
except Exception:
# TODO: should allow exception to optionally pass alarm
# status/severity through exception instance
await self.alarm.write(status=AlarmStatus.WRITE,
severity=AlarmSeverity.MAJOR_ALARM,
)
raise
finally:
alarm_md = self._collect_alarm()
if modified_value is SkipWrite:
# An alternative to raising SkipWrite: avoid the rest of this
# method.
return
# issues of over-riding user passed in data here!
metadata.update(alarm_md)
metadata.setdefault('timestamp', time.time())
if self._fill_at_next_write:
snapshot = copy.deepcopy(self)
for state, mode in self._fill_at_next_write:
self._snapshots[state][mode] = snapshot
self._fill_at_next_write.clear()
new = modified_value if modified_value is not None else value
# TODO the next 5 lines should be done in one move
self._data['value'] = new
await self.write_metadata(publish=False, **metadata)
# Send a new event to subscribers.
await self.publish(flags)
# and publish any linked alarms
if 'status' in metadata or 'severity' in metadata:
await self.alarm.publish(flags, except_for=(self,))
def _is_eligible(self, ss):
sync = ss.channel_filter.sync
return sync is None or sync.m in self._snapshots[sync.s]
async def update_fields(self, value):
"""This is a hook for subclasses to update field instance data."""
async def publish(self, flags):
"""
Publish data to appropriate queues matching the SubscriptionSpec.
Each SubscriptionSpec specifies a certain data type it is interested in
and a mask. Send one update per queue per data_type if and only if any
subscriptions specs on a queue have a compatible mask.
Parameters
----------
flags : SubscriptionSpec
The subscription specification to match.
"""
# Copying the data into structs with various data types is expensive,
# so we only want to do it if it's going to be used, and we only want
# to do each conversion once. Clear the cache to start. This cache is
# instance state so that self.subscribe can also use it.
self._content.clear()
for queue, syncs in self._queues.items():
# queue belongs to a Context that is expecting to receive
# updates of the form (sub_specs, metadata, values).
# data_types is a dict grouping the sub_specs for this queue by
# their data_type.
for sync, data_types in syncs.items():
for data_type_name, sub_specs in data_types.items():
eligible = tuple(ss for ss in sub_specs
if self._is_eligible(ss))
if not eligible:
continue
if sync is None:
channel_data = self
else:
try:
channel_data = self._snapshots[sync.s][sync.m]
except KeyError:
continue
try:
metadata, values = self._content[data_type_name]
except KeyError:
# Do the expensive data type conversion and cache it in
# case another queue or a future subscription wants the
# same data type.
data_type = _channel_type_by_name[data_type_name]
metadata, values = await channel_data._read(data_type)
channel_data._content[data_type] = metadata, values
# We will apply the array filter and deadband on the other side
# of the queue, since each eligible SubscriptionSpec may
# want a different slice. Sending the whole array through
# the queue isn't any more expensive that sending a slice;
# this is just a reference.
await queue.put(SubscriptionUpdate(eligible, metadata, values, flags, None))
def _read_metadata(self, dbr_metadata):
"""Fill the provided metadata instance with current metadata."""
to_type = ChannelType(dbr_metadata.DBR_ID)
data = self._data
if hasattr(dbr_metadata, 'units'):
units = data.get('units', '')
if isinstance(units, str):
units = units.encode(self.string_encoding
if self.string_encoding
else 'latin-1')
dbr_metadata.units = units
if hasattr(dbr_metadata, 'precision'):
dbr_metadata.precision = data.get('precision', 0)
if to_type in time_types:
dbr_metadata.stamp = self.epics_timestamp
convert_attrs = (GraphicControlBase.control_fields +
GraphicControlBase.graphic_fields)
if any(hasattr(dbr_metadata, attr) for attr in convert_attrs):
# convert all metadata types to the target type
dt = (self.data_type
if self.data_type != ChannelType.ENUM
else ChannelType.INT)
values = backend.convert_values(
values=[data.get(key, 0) for key in convert_attrs],
from_dtype=dt,
to_dtype=native_type(to_type),
string_encoding=self.string_encoding,
direction=ConversionDirection.TO_WIRE,
auto_byteswap=False)
if isinstance(values, backend.array_types):
values = values.tolist()
for attr, value in zip(convert_attrs, values):
if hasattr(dbr_metadata, attr):
setattr(dbr_metadata, attr, value)
async def write_metadata(self, publish=True, units=None, precision=None,
timestamp=None, upper_disp_limit=None,
lower_disp_limit=None, upper_alarm_limit=None,
upper_warning_limit=None,
lower_warning_limit=None, lower_alarm_limit=None,
upper_ctrl_limit=None, lower_ctrl_limit=None,
status=None, severity=None):
"""
Write metadata, optionally publishing information to clients.
Parameters
----------
publish : bool, optional
Publish the metadata update to clients.
units : str, optional
Updated units.
precision : int, optional
Updated precision value.
timestamp : float, TimeStamp, or 2-tuple, optional
Updated timestamp. Supported options include ``time.time()``-style
UNIX timestamps, raw EPICS timestamps in the form of ``TimeStamp``
or ``(seconds_since_epoch, nanoseconds)``.
upper_disp_limit : int or float, optional
Updated upper display limit.
lower_disp_limit : int or float, optional
Updated lower display limit.
upper_alarm_limit : int or float, optional
Updated upper alarm limit.
upper_warning_limit : int or float, optional
Updated upper warning limit.
lower_warning_limit : int or float, optional
Updated lower warning limit.
lower_alarm_limit : int or float, optional
Updated lower alarm limit.
upper_ctrl_limit : int or float, optional
Updated upper control limit.
lower_ctrl_limit : int or float, optional
Updated lower control limit.
status : AlarmStatus, optional
Updated alarm status.
severity : AlarmSeverity, optional
Updated alarm severity.
"""
data = self._data
for kw in ('units', 'precision', 'upper_disp_limit',
'lower_disp_limit', 'upper_alarm_limit',
'upper_warning_limit', 'lower_warning_limit',
'lower_alarm_limit', 'upper_ctrl_limit',
'lower_ctrl_limit'):
value = locals()[kw]
if value is not None and kw in data:
# Unpack scalars. This could be skipped for numpy.ndarray which
# does the right thing, but is essential for array.array to
# work.
try:
value, = value
except (TypeError, ValueError):
pass
data[kw] = value
if timestamp is not None:
self._data["timestamp"] = TimeStamp.from_flexible_value(timestamp)
if status is not None or severity is not None:
await self.alarm.write(status=status, severity=severity,
publish=publish)
if publish:
await self.publish(SubscriptionType.DBE_PROPERTY)
@property
def timestamp(self) -> float:
"""UNIX timestamp in seconds."""
return self._data["timestamp"].timestamp
@property
def epics_timestamp(self) -> TimeStamp:
"""EPICS timestamp as (seconds, nanoseconds) since EPICS epoch."""
return copy.copy(self._data["timestamp"])
@property
def status(self):
'''Alarm status'''
return (self.alarm.status
if self._status is None
else self._status)
@status.setter
def status(self, value):
self._status = AlarmStatus(value)
@property
def severity(self):
'''Alarm severity'''
return (self.alarm.severity
if self._severity is None
else self._severity)
@severity.setter
def severity(self, value):
self._severity = AlarmSeverity(value)
def _collect_alarm(self):
out = {}
if self._status is not None and self._status != self.alarm.status:
out['status'] = self._status
if self._severity is not None and self._status != self.alarm.status:
out['severity'] = self._severity
self._clear_cached_alarms()
return out
def _clear_cached_alarms(self):
self._status = self._severity = None
def __len__(self):
try:
return len(self.value)
except TypeError:
return 1
def check_access(self, hostname, username):
"""
This always returns ``AccessRights.READ|AccessRights.WRITE``.
Subclasses can override to implement access logic using hostname,
username and returning one of:
(``AccessRights.NO_ACCESS``,
``AccessRights.READ``,
``AccessRights.WRITE``,
``AccessRights.READ|AccessRights.WRITE``).
Parameters
----------
hostname : string
username : string
Returns
-------
access : :data:`AccessRights.READ|AccessRights.WRITE`
"""
return AccessRights.READ | AccessRights.WRITE
def is_compatible_array(self, value) -> bool:
"""
Check if the provided value is a compatible array.
This requires that ``value`` follow the "array interface", as defined by
`numpy <https://numpy.org/doc/stable/reference/arrays.interface.html>`_.
Parameters
----------
value : any
The value to check.
Returns
-------
bool
True if ``value`` is compatible, False otherwise.
"""
interface = getattr(value, "__array_interface__", None)
if interface is None:
return False
dimensions = len(interface['shape'])
return (
# Ensure it's 1 dimensional:
dimensions == 1 and
# Not strided - which 1D data shouldn't be anyway...
interface['strides'] is None and
# And a compatible array type, defined in the class body:
interface['typestr'] in self._compatible_array_types
)
[docs]class ChannelEnum(ChannelData):
"""
ENUM data which can be sent over a channel.
Arrays of ENUM data are not supported.
Parameters
----------
value : int or str
The string value in the enum, or an integer index of that list.
enum_strings : list, tuple, optional
Enum strings to be used for the data.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
"""
data_type = ChannelType.ENUM
@staticmethod
def _validate_enum_strings(enum_strings):
if any(len(es) >= MAX_ENUM_STRING_SIZE for es in enum_strings):
over_length = tuple(f'{es}: {len(es)}' for es in enum_strings if
len(es) >= MAX_ENUM_STRING_SIZE)
msg = (f"The maximum enum string length is {MAX_ENUM_STRING_SIZE} " +
f"but the strings {over_length} are too long")
raise ValueError(msg)
if len(enum_strings) > MAX_ENUM_STATES:
raise ValueError(f"The maximum number of enum states is {MAX_ENUM_STATES} " +
f"but you passed in {len(enum_strings)}")
return tuple(enum_strings)
def __init__(self, *, enum_strings=None, **kwargs):
super().__init__(**kwargs)
if enum_strings is None:
enum_strings = tuple()
self._data['enum_strings'] = self._validate_enum_strings(enum_strings)
enum_strings = _read_only_property('enum_strings')
def get_raw_value(self, value) -> int:
"""The raw integer value index of the provided enum string."""
try:
return self.enum_strings.index(value)
except ValueError:
return None
@property
def raw_value(self) -> int:
"""The raw integer value index of the enum string."""
return self.get_raw_value(self.value)
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs['enum_strings'] = self.enum_strings
return (args, kwargs)
async def verify_value(self, data):
try:
return self.enum_strings[data]
except (IndexError, TypeError):
...
return data
def _read_metadata(self, dbr_metadata):
if isinstance(dbr_metadata, (DBR_TYPES[ChannelType.GR_ENUM],
DBR_TYPES[ChannelType.CTRL_ENUM])):
dbr_metadata.enum_strings = [s.encode(self.string_encoding)
for s in self.enum_strings]
return super()._read_metadata(dbr_metadata)
async def write(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write(*args, flags=flags, **kwargs)
async def write_from_dbr(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write_from_dbr(*args, flags=flags, **kwargs)
async def write_metadata(self, enum_strings=None, **kwargs):
if enum_strings is not None:
self._data['enum_strings'] = self._validate_enum_strings(enum_strings)
return await super().write_metadata(**kwargs)
class ChannelNumeric(ChannelData):
"""
Base class for numeric data types with limits.
May be a single value or an array of values.
Parameters
----------
value :
Data which has to match with this class's ``data_type``.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : numeric, optional
Upper display limit.
lower_disp_limit : numeric, optional
Lower display limit.
upper_alarm_limit : numeric, optional
Upper alarm limit.
upper_warning_limit : numeric, optional
Upper warning limit.
lower_warning_limit : numeric, optional
Lower warning limit.
lower_alarm_limit : numeric, optional
Lower alarm limit.
upper_ctrl_limit : numeric, optional
Upper control limit.
lower_ctrl_limit : numeric, optional
Lower control limit.
value_atol : numeric, optional
Archive tolerance value.
log_atol : numeric, optional
Log tolerance value.
"""
def __init__(self, *, value, units='',
upper_disp_limit=0, lower_disp_limit=0,
upper_alarm_limit=0, upper_warning_limit=0,
lower_warning_limit=0, lower_alarm_limit=0,
upper_ctrl_limit=0, lower_ctrl_limit=0,
value_atol=0.0, log_atol=0.0,
**kwargs):
super().__init__(value=value, **kwargs)
self._data['units'] = units
self._data['upper_disp_limit'] = upper_disp_limit
self._data['lower_disp_limit'] = lower_disp_limit
self._data['upper_alarm_limit'] = upper_alarm_limit
self._data['upper_warning_limit'] = upper_warning_limit
self._data['lower_warning_limit'] = lower_warning_limit
self._data['lower_alarm_limit'] = lower_alarm_limit
self._data['upper_ctrl_limit'] = upper_ctrl_limit
self._data['lower_ctrl_limit'] = lower_ctrl_limit
self.value_atol = value_atol
self.log_atol = log_atol
units = _read_only_property('units')
upper_disp_limit = _read_only_property('upper_disp_limit')
lower_disp_limit = _read_only_property('lower_disp_limit')
upper_alarm_limit = _read_only_property('upper_alarm_limit')
lower_alarm_limit = _read_only_property('lower_alarm_limit')
upper_warning_limit = _read_only_property('upper_warning_limit')
lower_warning_limit = _read_only_property('lower_warning_limit')
upper_ctrl_limit = _read_only_property('upper_ctrl_limit')
lower_ctrl_limit = _read_only_property('lower_ctrl_limit')
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs.update(
units=self.units,
upper_disp_limit=self.upper_disp_limit,
lower_disp_limit=self.lower_disp_limit,
upper_alarm_limit=self.upper_alarm_limit,
lower_alarm_limit=self.lower_alarm_limit,
upper_warning_limit=self.upper_warning_limit,
lower_warning_limit=self.lower_warning_limit,
upper_ctrl_limit=self.upper_ctrl_limit,
lower_ctrl_limit=self.lower_ctrl_limit,
)
return (args, kwargs)
async def verify_value(self, data):
if not isinstance(data, Iterable):
val = data
elif len(data) == 1:
val, = data
else:
# data is an array -- limits do not apply.
return data
if self.lower_ctrl_limit != self.upper_ctrl_limit:
if not self.lower_ctrl_limit <= val <= self.upper_ctrl_limit:
raise CannotExceedLimits(
f"Cannot write data {val}. Limits are set to "
f"{self.lower_ctrl_limit} and {self.upper_ctrl_limit}."
)
def limit_checker(
value,
lo_attr, hi_attr,
lo_status, hi_status,
lo_severity_attr,
hi_severity_attr,
dflt_lo_severity,
dflt_hi_severity):
def limit_getter(limit_attr, severity_attr, dflt_severity):
sev = dflt_severity
limit = getattr(self, limit_attr)
sev_prop = getattr(
getattr(self, 'field_inst', None),
severity_attr, None)
if sev_prop is not None:
# TODO sort out where ints are getting through...
if isinstance(sev_prop.value, str):
sev = sev_prop.enum_strings.index(sev_prop.value)
return limit, AlarmSeverity(sev)
lo_limit, lo_severity = limit_getter(
lo_attr, lo_severity_attr, dflt_lo_severity)
hi_limit, hi_severity = limit_getter(
hi_attr, hi_severity_attr, dflt_hi_severity)
if lo_limit != hi_limit:
if value <= lo_limit:
return lo_status, lo_severity
elif hi_limit <= value:
return hi_status, hi_severity
return AlarmStatus.NO_ALARM, AlarmSeverity.NO_ALARM
# this is HIHI and LOLO limits
asts, asver = limit_checker(val,
'lower_alarm_limit',
'upper_alarm_limit',
AlarmStatus.LOLO,
AlarmStatus.HIHI,
'lolo_severity',
'hihi_severity',
AlarmSeverity.MAJOR_ALARM,
AlarmSeverity.MAJOR_ALARM)
# if HIHI and LOLO did not trigger as alarm, see if HIGH and LOW do
if asts is AlarmStatus.NO_ALARM:
# this is HIGH and LOW limits
asts, asver = limit_checker(val,
'lower_warning_limit',
'upper_warning_limit',
AlarmStatus.LOW,
AlarmStatus.HIGH,
'low_severity',
'high_severity',
AlarmSeverity.MINOR_ALARM,
AlarmSeverity.MINOR_ALARM)
self.status = asts
self.severity = asver
return data
[docs]class ChannelShort(ChannelNumeric):
"""
16-bit SHORT integer data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : int or list of int
Default starting value.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : int, optional
Upper display limit.
lower_disp_limit : int, optional
Lower display limit.
upper_alarm_limit : int, optional
Upper alarm limit.
upper_warning_limit : int, optional
Upper warning limit.
lower_warning_limit : int, optional
Lower warning limit.
lower_alarm_limit : int, optional
Lower alarm limit.
upper_ctrl_limit : int, optional
Upper control limit.
lower_ctrl_limit : int, optional
Lower control limit.
value_atol : int, optional
Archive tolerance value.
log_atol : int, optional
Log tolerance value.
"""
data_type = ChannelType.INT
[docs]class ChannelInteger(ChannelNumeric):
"""
32-bit LONG integer data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : int or list of int
Default starting value.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : int, optional
Upper display limit.
lower_disp_limit : int, optional
Lower display limit.
upper_alarm_limit : int, optional
Upper alarm limit.
upper_warning_limit : int, optional
Upper warning limit.
lower_warning_limit : int, optional
Lower warning limit.
lower_alarm_limit : int, optional
Lower alarm limit.
upper_ctrl_limit : int, optional
Upper control limit.
lower_ctrl_limit : int, optional
Lower control limit.
value_atol : int, optional
Archive tolerance value.
log_atol : int, optional
Log tolerance value.
"""
data_type = ChannelType.LONG
[docs]class ChannelFloat(ChannelNumeric):
"""
32-bit floating point data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : float or list of float
Initial value for the data.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual record or
be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : float, optional
Upper display limit.
lower_disp_limit : float, optional
Lower display limit.
upper_alarm_limit : float, optional
Upper alarm limit.
upper_warning_limit : float, optional
Upper warning limit.
lower_warning_limit : float, optional
Lower warning limit.
lower_alarm_limit : float, optional
Lower alarm limit.
upper_ctrl_limit : float, optional
Upper control limit.
lower_ctrl_limit : float, optional
Lower control limit.
value_atol : float, optional
Archive tolerance value.
log_atol : float, optional
Log tolerance value.
"""
data_type = ChannelType.FLOAT
def __init__(self, *, precision=0, **kwargs):
super().__init__(**kwargs)
self._data['precision'] = precision
precision = _read_only_property('precision')
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs['precision'] = self.precision
return (args, kwargs)
[docs]class ChannelDouble(ChannelNumeric):
"""
64-bit floating point data and metadata which can be sent over a channel.
May be a single value or an array of values.
Parameters
----------
value : float or list of float
Initial value for the data.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : float, optional
Upper display limit.
lower_disp_limit : float, optional
Lower display limit.
upper_alarm_limit : float, optional
Upper alarm limit.
upper_warning_limit : float, optional
Upper warning limit.
lower_warning_limit : float, optional
Lower warning limit.
lower_alarm_limit : float, optional
Lower alarm limit.
upper_ctrl_limit : float, optional
Upper control limit.
lower_ctrl_limit : float, optional
Lower control limit.
value_atol : float, optional
Archive tolerance value.
log_atol : float, optional
Log tolerance value.
"""
data_type = ChannelType.DOUBLE
def __init__(self, *, precision=0, **kwargs):
super().__init__(**kwargs)
self._data['precision'] = precision
precision = _read_only_property('precision')
def __getnewargs_ex__(self):
args, kwargs = super().__getnewargs_ex__()
kwargs['precision'] = self.precision
return (args, kwargs)
[docs]class ChannelByte(ChannelNumeric):
"""
8-bit unencoded CHAR data plus metadata which can be sent over a channel.
May be a single CHAR or an array of CHAR.
Parameters
----------
value : int or bytes
Initial starting data.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
upper_disp_limit : int, optional
Upper display limit.
lower_disp_limit : int, optional
Lower display limit.
upper_alarm_limit : int, optional
Upper alarm limit.
upper_warning_limit : int, optional
Upper warning limit.
lower_warning_limit : int, optional
Lower warning limit.
lower_alarm_limit : int, optional
Lower alarm limit.
upper_ctrl_limit : int, optional
Upper control limit.
lower_ctrl_limit : int, optional
Lower control limit.
value_atol : int, optional
Archive tolerance value.
log_atol : int, optional
Log tolerance value.
"""
# 'Limits' on chars do not make much sense and are rarely used.
data_type = ChannelType.CHAR
_compatible_array_types = {'|u1', '|i1', '|b1'}
def __init__(self, *, string_encoding=None, strip_null_terminator=True,
**kwargs):
if string_encoding is not None:
raise CaprotoValueError('ChannelByte cannot have a string encoding')
self.strip_null_terminator = strip_null_terminator
super().__init__(string_encoding=None, **kwargs)
def preprocess_value(self, value):
value = super().preprocess_value(value)
if self.is_compatible_array(value):
if not is_array_read_only(value):
value = copy.copy(value)
if self.max_length == 1:
return value[0]
return value
if isinstance(value, (list, tuple) + backend.array_types):
if not len(value):
return b''
elif len(value) == 1:
value = value[0]
else:
value = b''.join(map(bytes, ([v] for v in value)))
if self.max_length == 1:
try:
len(value)
except TypeError:
# Allow a scalar byte value to be passed in
value = bytes([value])
if isinstance(value, str):
raise CaprotoValueError('ChannelByte does not accept decoded strings')
if self.strip_null_terminator:
if not isinstance(value, bytes):
value = value.tobytes()
value = value.rstrip(b'\x00')
return value
[docs]class ChannelChar(ChannelData):
"""
8-bit encoded CHAR data plus metadata which can be sent over a channel.
Allows for simple access over CA to the first 40 characters as a DBR_STRING
when ``report_as_string`` is set.
Parameters
----------
value : str
Initial starting data.
string_encoding : str, optional
The string encoding to use, defaults to 'latin-1'.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
units : str, optional
Engineering units indicator, which can be retrieved over channel
access.
"""
data_type = ChannelType.CHAR
_compatible_array_types = {'|u1', '|i1', '|b1'}
def __init__(self, *, alarm=None, value=None, timestamp=None,
max_length=None, string_encoding='latin-1',
reported_record_type='caproto', report_as_string=False):
super().__init__(alarm=alarm, value=value, timestamp=timestamp,
max_length=max_length,
string_encoding=string_encoding,
reported_record_type=reported_record_type)
if report_as_string:
self.data_type = ChannelType.STRING
@property
def long_string_max_length(self):
'The maximum number of elements (length) of the current value'
return super().max_length
@property
def max_length(self):
'The number of elements (length) of the current value'
if self.data_type == ChannelType.STRING:
return 1
return super().max_length
def preprocess_value(self, value):
value = super().preprocess_value(value)
if self.is_compatible_array(value):
value = value.tobytes()
elif isinstance(value, (list, tuple) + backend.array_types):
if not len(value):
value = b''
elif len(value) == 1:
value = value[0]
else:
value = b''.join(map(bytes, ([v] for v in value)))
if self.max_length == 1:
try:
len(value)
except TypeError:
# Allow a scalar byte value to be passed in
value = str(bytes([value]), self.string_encoding)
if isinstance(value, bytes):
value = value.decode(self.string_encoding)
if not isinstance(value, str):
raise CaprotoValueError('Invalid string')
return value
async def write(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write(*args, flags=flags, **kwargs)
async def write_from_dbr(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write_from_dbr(*args, flags=flags, **kwargs)
[docs]class ChannelString(ChannelData):
"""
8-bit encoded string data plus metadata which can be sent over a channel.
Allows for simple access over CA to the first 40 characters as a DBR_STRING
when ``report_as_string`` is set.
Parameters
----------
value : str
Initial starting data.
string_encoding : str, optional
The string encoding to use, defaults to 'latin-1'.
long_string_max_length : str, optional
When requested as a long string (DBR_CHAR over Channel Access), this
is the reported maximum length.
timestamp : float, TimeStamp, or 2-tuple, optional
Timestamp to report for the current value. Supported options include
``time.time()``-style UNIX timestamps, raw EPICS timestamps in the form
of ``TimeStamp`` or ``(seconds_since_epoch, nanoseconds)``.
max_length : int, optional
Maximum array length of the data.
string_encoding : str, optional
Encoding to use for strings, used when serializing and deserializing
data.
reported_record_type : str, optional
Though this is not a record, the channel access protocol supports
querying the record type. This can be set to mimic an actual
record or be set to something arbitrary. Defaults to 'caproto'.
"""
data_type = ChannelType.STRING
def __init__(self, *, alarm=None, value=None, timestamp=None,
max_length=None, string_encoding='latin-1',
reported_record_type='caproto', long_string_max_length=81):
super().__init__(alarm=alarm, value=value, timestamp=timestamp,
max_length=max_length,
string_encoding=string_encoding,
reported_record_type=reported_record_type)
self._long_string_max_length = long_string_max_length
@property
def long_string_max_length(self):
'The maximum number of elements (length) of the current value'
return self._long_string_max_length
async def write(self, value, *, flags=0, **metadata):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write(value, flags=flags, **metadata)
async def write_from_dbr(self, *args, flags=0, **kwargs):
flags |= (SubscriptionType.DBE_LOG | SubscriptionType.DBE_VALUE)
await super().write_from_dbr(*args, flags=flags, **kwargs)