Source code for caproto.server.records.records

'''
Contains PVGroups representing all fields of EPICS base records.

This file contains implementation functionality on top of the record instances
found in :mod:`caproto.server.records.base` - a module auto-generated from
a reference EPICS DBD file.

Any customizations required for fields should be done in this file.
'''
import logging
from typing import ClassVar

from ..._data import ChannelData
from ..server import PvpropertyStringRO, pvproperty
from . import base
from .utils import link_enum_strings, link_parent_attribute, register_record

logger = logging.getLogger(__name__)


[docs]class RecordFieldGroup(base.RecordFieldGroup): _base = base.RecordFieldGroup _record_type: ClassVar[str] parent: ChannelData # Add some handling onto the autogenerated code above: record_type = pvproperty( name='RTYP', dtype=PvpropertyStringRO, read_only=True, doc='Record type' ) def __init__(self, prefix, **kw): super().__init__(prefix, **kw) parent = self.parent # set .NAME self.record_name._data['value'] = parent.pvname # set .RTYP self.record_type._data['value'] = self._record_type # automatic alarm handling self._alarm = parent.alarm self._alarm.connect(self) async def publish(self, flags): # if SubscriptionType.DBE_ALARM in flags: # TODO this needs tweaking - proof of concept at the moment await self.alarm_acknowledge_transient.write( self._alarm.must_acknowledge_transient) await self.alarm_acknowledge_severity.write( self._alarm.severity_to_acknowledge) await self.alarm_status.write(self._alarm.status) await self.current_alarm_severity.write(self._alarm.severity) @_base.scan_rate.putter async def scan_rate(self, instance, value): scan_string = (self.scan_rate.enum_strings[value] if isinstance( value, int) else value) if scan_string in ('I/O Intr', 'Passive', 'Event'): self._scan_rate_sec = 0 else: self._scan_rate_sec = float(scan_string.split(' ')[0]) if hasattr(self.parent, 'scan_rate'): self.parent.scan_rate = self._scan_rate_sec @property def scan_rate_sec(self): 'Record scan rate, in seconds (read-only)' return self._scan_rate_sec @_base.process_record.putter async def process_record(self, instance, value): await self.parent.write(self.parent.value) link_parent_attribute(_base.description, '__doc__', use_setattr=True) async def value_write_hook(self, instance, value): """An overridable hook for the parent value having been updated.""" ...
[docs]@register_record class AiFields(base.AiFields, RecordFieldGroup): _base = base.AiFields link_parent_attribute( _base.display_precision, 'precision', ) link_parent_attribute(_base.archive_deadband, 'log_atol', use_setattr=True) link_parent_attribute(_base.monitor_deadband, 'value_atol', use_setattr=True)
[docs]@register_record class AsubFields(base.AsubFields, RecordFieldGroup): _base = base.AsubFields link_parent_attribute( _base.display_precision, "precision", )
[docs]@register_record class AaiFields(base.AaiFields, RecordFieldGroup): _base = base.AaiFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute( _base.high_operating_range, "upper_ctrl_limit", ) link_parent_attribute( _base.low_operating_range, "lower_ctrl_limit", ) link_parent_attribute( _base.number_of_elements, "max_length", use_setattr=True, read_only=True )
[docs]@register_record class AaoFields(base.AaoFields, RecordFieldGroup): _base = base.AaoFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute( _base.high_operating_range, "upper_ctrl_limit", ) link_parent_attribute( _base.low_operating_range, "lower_ctrl_limit", ) link_parent_attribute( _base.number_of_elements, "max_length", use_setattr=True, read_only=True )
[docs]@register_record class AoFields(base.AoFields, RecordFieldGroup): _base = base.AoFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class AsynFields(base.AsynFields, RecordFieldGroup): _base = base.AsynFields
[docs]@register_record class BiFields(base.BiFields, RecordFieldGroup): _base = base.BiFields link_enum_strings(_base.zero_name, index=0) link_enum_strings(_base.one_name, index=1) @_base.raw_value.putter async def raw_value(self, instance, value): await self.parent.write( value=value ) async def value_write_hook(self, instance, value): raw_value = self.parent.get_raw_value(value) if raw_value is not None: await self.raw_value.write(raw_value, verify_value=False)
[docs]@register_record class BoFields(base.BoFields, RecordFieldGroup): _base = base.BoFields link_enum_strings(_base.zero_name, index=0) link_enum_strings(_base.one_name, index=1) @_base.raw_value.putter async def raw_value(self, instance, value): await self.parent.write( value=value ) async def value_write_hook(self, instance, value): raw_value = self.parent.get_raw_value(value) if raw_value is not None: await self.raw_value.write(raw_value, verify_value=False)
[docs]@register_record class CalcFields(base.CalcFields, RecordFieldGroup): _base = base.CalcFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class CalcoutFields(base.CalcoutFields, RecordFieldGroup): _base = base.CalcoutFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class CompressFields(base.CompressFields, RecordFieldGroup): _base = base.CompressFields link_parent_attribute( _base.high_operating_range, "upper_ctrl_limit", ) link_parent_attribute( _base.low_operating_range, "lower_ctrl_limit", ) link_parent_attribute( _base.display_precision, "precision", )
[docs]@register_record class DfanoutFields(base.DfanoutFields, RecordFieldGroup): _base = base.DfanoutFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class EventFields(base.EventFields, RecordFieldGroup): _base = base.EventFields
[docs]@register_record class FanoutFields(base.FanoutFields, RecordFieldGroup): _base = base.FanoutFields
[docs]@register_record class HistogramFields(base.HistogramFields, RecordFieldGroup): _base = base.HistogramFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute( _base.high_operating_range, "upper_ctrl_limit", ) link_parent_attribute( _base.low_operating_range, "lower_ctrl_limit", )
[docs]@register_record class Int64inFields(base.Int64inFields, RecordFieldGroup): _base = base.Int64inFields link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class Int64outFields(base.Int64outFields, RecordFieldGroup): _base = base.Int64outFields link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class LonginFields(base.LonginFields, RecordFieldGroup): _base = base.LonginFields link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class LongoutFields(base.LongoutFields, RecordFieldGroup): _base = base.LongoutFields link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class LsiFields(base.LsiFields, RecordFieldGroup): _base = base.LsiFields
[docs]@register_record class LsoFields(base.LsoFields, RecordFieldGroup): _base = base.LsoFields
[docs]@register_record class MbbiFields(base.MbbiFields, RecordFieldGroup): _base = base.MbbiFields link_enum_strings(_base.zero_string, index=0) link_enum_strings(_base.one_string, index=1) link_enum_strings(_base.two_string, index=2) link_enum_strings(_base.three_string, index=3) link_enum_strings(_base.four_string, index=4) link_enum_strings(_base.five_string, index=5) link_enum_strings(_base.six_string, index=6) link_enum_strings(_base.seven_string, index=7) link_enum_strings(_base.eight_string, index=8) link_enum_strings(_base.nine_string, index=9) link_enum_strings(_base.ten_string, index=10) link_enum_strings(_base.eleven_string, index=11) link_enum_strings(_base.twelve_string, index=12) link_enum_strings(_base.thirteen_string, index=13) link_enum_strings(_base.fourteen_string, index=14) link_enum_strings(_base.fifteen_string, index=15) @_base.raw_value.putter async def raw_value(self, instance, value): await self.parent.write( value=value ) async def value_write_hook(self, instance, value): raw_value = self.parent.get_raw_value(value) if raw_value is not None: await self.raw_value.write(raw_value, verify_value=False)
[docs]@register_record class MbbidirectFields(base.MbbidirectFields, RecordFieldGroup): _base = base.MbbidirectFields
[docs]@register_record class MbboFields(base.MbboFields, RecordFieldGroup): _base = base.MbboFields link_enum_strings(_base.one_string, index=1) link_enum_strings(_base.two_string, index=2) link_enum_strings(_base.three_string, index=3) link_enum_strings(_base.four_string, index=4) link_enum_strings(_base.five_string, index=5) link_enum_strings(_base.six_string, index=6) link_enum_strings(_base.seven_string, index=7) link_enum_strings(_base.eight_string, index=8) link_enum_strings(_base.nine_string, index=9) link_enum_strings(_base.ten_string, index=10) link_enum_strings(_base.eleven_string, index=11) link_enum_strings(_base.twelve_string, index=12) link_enum_strings(_base.thirteen_string, index=13) link_enum_strings(_base.fourteen_string, index=14) link_enum_strings(_base.fifteen_string, index=15) @_base.raw_value.putter async def raw_value(self, instance, value): await self.parent.write( value=value ) async def value_write_hook(self, instance, value): raw_value = self.parent.get_raw_value(value) if raw_value is not None: await self.raw_value.write(raw_value, verify_value=False)
[docs]@register_record class MbbodirectFields(base.MbbodirectFields, RecordFieldGroup): _base = base.MbbodirectFields
[docs]@register_record class MotorFields(base.MotorFields, RecordFieldGroup): _base = base.MotorFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class PermissiveFields(base.PermissiveFields, RecordFieldGroup): _base = base.PermissiveFields
[docs]@register_record class PrintfFields(base.PrintfFields, RecordFieldGroup): _base = base.PrintfFields
[docs]@register_record class SelFields(base.SelFields, RecordFieldGroup): _base = base.SelFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class SeqFields(base.SeqFields, RecordFieldGroup): _base = base.SeqFields link_parent_attribute( _base.display_precision, "precision", )
[docs]@register_record class StateFields(base.StateFields, RecordFieldGroup): _base = base.StateFields
[docs]@register_record class StringinFields(base.StringinFields, RecordFieldGroup): _base = base.StringinFields
[docs]@register_record class StringoutFields(base.StringoutFields, RecordFieldGroup): _base = base.StringoutFields
[docs]@register_record class SubFields(base.SubFields, RecordFieldGroup): _base = base.SubFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute(_base.archive_deadband, "log_atol", use_setattr=True) link_parent_attribute(_base.monitor_deadband, "value_atol", use_setattr=True)
[docs]@register_record class SubarrayFields(base.SubarrayFields, RecordFieldGroup): _base = base.SubarrayFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute( _base.high_operating_range, "upper_ctrl_limit", ) link_parent_attribute( _base.low_operating_range, "lower_ctrl_limit", ) link_parent_attribute( _base.maximum_elements, "length", use_setattr=True, read_only=True ) link_parent_attribute( _base.number_of_elements, "max_length", use_setattr=True, read_only=True )
[docs]@register_record class WaveformFields(base.WaveformFields, RecordFieldGroup): _base = base.WaveformFields link_parent_attribute( _base.display_precision, "precision", ) link_parent_attribute( _base.high_operating_range, "upper_ctrl_limit", ) link_parent_attribute( _base.low_operating_range, "lower_ctrl_limit", ) link_parent_attribute( _base.number_of_elements, "max_length", use_setattr=True, read_only=True )