Source code for caproto._dbr

# Manually written with reference to:
# http://www.aps.anl.gov/epics/base/R3-16/0-docs/CAproto/index.html#payload-data-types
# https://github.com/epics-base/epics-base/blob/813166128eae1240cdd643869808abe1c4621321/src/ca/client/db_access.h

# The organizational code, making use of Enum, comes from pypvasync by Kenneth
# Lauer.

import collections
import ctypes
import datetime
import logging
import numbers
import time
from enum import IntEnum, IntFlag
from typing import ClassVar, Tuple

from ._constants import (EPICS2UNIX_EPOCH, EPICS_EPOCH, MAX_ENUM_STATES,
                         MAX_ENUM_STRING_SIZE, MAX_STRING_SIZE, MAX_UNITS_SIZE)

__all__ = ('AccessRights', 'AlarmSeverity', 'AlarmStatus', 'ConnStatus',
           'TimeStamp', 'ChannelType', 'SubscriptionType', 'DbrStringArray',
           'epics_timestamp_to_unix', 'timestamp_to_epics',
           'field_types', 'DBR_TYPES', 'native_type', 'native_types',
           'status_types', 'time_types', 'graphical_types', 'control_types',
           'char_types', 'string_types', 'int_types', 'float_types',
           'enum_types', 'char_types', 'native_float_types',
           'native_int_types')


logger = logging.getLogger('caproto')


class AccessRights(IntFlag):
    NO_ACCESS = 0
    READ = 1
    WRITE = 2


class AlarmSeverity(IntEnum):
    NO_ALARM = 0
    MINOR_ALARM = 1
    MAJOR_ALARM = 2
    INVALID_ALARM = 3


class AlarmStatus(IntEnum):
    NO_ALARM = 0
    READ = 1
    WRITE = 2
    HIHI = 3
    HIGH = 4
    LOLO = 5
    LOW = 6
    STATE = 7
    COS = 8
    COMM = 9
    TIMEOUT = 10
    HWLIMIT = 11
    CALC = 12
    SCAN = 13
    LINK = 14
    SOFT = 15
    BAD_SUB = 16
    UDF = 17
    DISABLE = 18
    SIMM = 19
    READ_ACCESS = 20
    WRITE_ACCESS = 21


# EPICS Constants
class ECA(IntEnum):
    NORMAL = 1
    TIMEOUT = 80
    IODONE = 339
    ISATTACHED = 424
    BADCHID = 410


class ConnStatus(IntEnum):
    CS_CONN = 2
    OP_CONN_UP = 6
    OP_CONN_DOWN = 7
    CS_NEVER_SEARCH = 4


[docs]class ChannelType(IntEnum): ''' All channel types supported by Channel Access The ones that should be used in servers to specify the type of pvproperty or ChannelData are only "native" types (STRING, INT, FLOAT, ENUM, CHAR, LONG, DOUBLE) The remaining channel data types are used for clients requesting additional metadata from the server. ''' STRING = 0 INT = 1 FLOAT = 2 ENUM = 3 CHAR = 4 LONG = 5 DOUBLE = 6 STS_STRING = 7 STS_INT = 8 STS_FLOAT = 9 STS_ENUM = 10 STS_CHAR = 11 STS_LONG = 12 STS_DOUBLE = 13 TIME_STRING = 14 TIME_INT = 15 TIME_FLOAT = 16 TIME_ENUM = 17 TIME_CHAR = 18 TIME_LONG = 19 TIME_DOUBLE = 20 GR_STRING = 21 # not implemented by EPICS GR_INT = 22 GR_FLOAT = 23 GR_ENUM = 24 GR_CHAR = 25 GR_LONG = 26 GR_DOUBLE = 27 CTRL_STRING = 28 # not implemented by EPICS CTRL_INT = 29 CTRL_FLOAT = 30 CTRL_ENUM = 31 CTRL_CHAR = 32 CTRL_LONG = 33 CTRL_DOUBLE = 34 PUT_ACKT = 35 PUT_ACKS = 36 STSACK_STRING = 37 CLASS_NAME = 38
class _LongStringChannelType(IntEnum): ''' An internal data type enum which mirrors CHAR values from ChannelType. The key difference for this enum is that the server performs a different conversion for STRING -> CHAR and STRING -> LONG_STRING. ''' LONG_STRING = 4 STS_LONG_STRING = 11 TIME_LONG_STRING = 18 GR_LONG_STRING = 25 CTRL_LONG_STRING = 32 _channel_type_by_name = { type_.name: type_ for type_ in list(ChannelType) + list(_LongStringChannelType) } class SubscriptionType(IntFlag): '''Subscription masks DBE_VALUE Trigger an event when a significant change in the channel's value occurs. (In epics-base, relies on the monitor deadband field under DCT.) DBE_ARCHIVE (DBE_LOG) Trigger an event when an archive significant change in the channel's value occurs. (In epics-base, relies on the archiver monitor deadband field under DCT.) DBE_ALARM Trigger an event when the alarm state changes DBE_PROPERTY Trigger an event when a property change (control limit, graphical limit, status string, enum string ...) occurs. ''' DBE_VALUE = 1 DBE_LOG = 2 DBE_ALARM = 4 DBE_PROPERTY = 8 string_t = MAX_STRING_SIZE * ctypes.c_char # epicsOldString char_t = ctypes.c_char # epicsUint8 short_t = ctypes.c_int16 # epicsInt16 ushort_t = ctypes.c_uint16 # epicsUInt16 int_t = ctypes.c_int16 # epicsInt16 long_t = ctypes.c_int32 # epicsInt32 ulong_t = ctypes.c_uint32 # epicsUInt32 float_t = ctypes.c_float # epicsFloat32 double_t = ctypes.c_double # epicsFloat64 class DbrStringArray(collections.UserList): '''A mockup of numpy.array, intended to hold byte strings String arrays in numpy are special and inconvenient to work with. ''' def __getitem__(self, i): res = self.data[i] return type(self)(res) if isinstance(i, slice) else res @classmethod def frombuffer(cls, buf, data_count=None): 'Create a DbrStringArray from a buffer' if data_count is None: data_count = max((1, len(buf) // MAX_STRING_SIZE)) def safely_find_eos(): 'Find null terminator, else MAX_STRING_SIZE/length of the string' try: return min((MAX_STRING_SIZE, buf.index(b'\00'))) except ValueError: return min((MAX_STRING_SIZE, len(buf))) buf = bytes(buf) strings = cls() for _ in range(data_count): strings.append(buf[:safely_find_eos()]) buf = buf[MAX_STRING_SIZE:] return strings def tobytes(self): # numpy compat return b''.join(item[:MAX_STRING_SIZE].ljust(MAX_STRING_SIZE, b'\x00') for item in self) class DbrTypeBase(ctypes.BigEndianStructure): '''Base class for all DBR types''' _pack_ = 1 info_fields: ClassVar[Tuple[str, ...]] = () def to_dict(self): d = {field: getattr(self, field) for field in self.info_fields} if 'status' in d: try: d['status'] = AlarmStatus(d['status']) except ValueError: logger.exception('Invalid alarm status: %s', d['status']) d.pop('status') if 'severity' in d: try: d['severity'] = AlarmSeverity(d['severity']) except ValueError: logger.exception('Invalid alarm severity: %s', d['severity']) d.pop('severity') return d def __repr__(self): formatted_args = ", ".join(["{!s}={!r}".format(k, v) for k, v in self.to_dict().items()]) return "{}({})".format(type(self).__name__, formatted_args) class TimeStamp(DbrTypeBase): '''An EPICS timestamp with 32-bit seconds and nanoseconds''' _fields_ = [ ('secondsSinceEpoch', ctypes.c_uint32), ('nanoSeconds', ctypes.c_uint32) ] info_fields = ('timestamp', ) @property def timestamp(self): """Timestamp as a UNIX timestamp (seconds).""" return epics_timestamp_to_unix(self.secondsSinceEpoch, self.nanoSeconds) @classmethod def from_unix_timestamp(cls, timestamp): """Create a ``TimeStamp`` from a UNIX timestamp.""" sec, nano = timestamp_to_epics(timestamp) return cls(secondsSinceEpoch=sec, nanoSeconds=nano) @classmethod def now(cls): """Get a new ``TimeStamp`` representing 'now' (``time.time()``)""" return cls.from_unix_timestamp(time.time()) @classmethod def from_flexible_value(cls, timestamp): """ Flexible TimeStamp conversion. Parameters ---------- timestamp : float, int, TimeStamp, or 2-tuple Accepts either a number (UNIX timestamp), a TimeStamp instance, or a tuple of (seconds, nanoseconds). Returns ------- TimeStamp The EPICS-compatible timestamp. """ if isinstance(timestamp, numbers.Real): sec, nano = timestamp_to_epics(timestamp) else: sec, nano = tuple(timestamp) return cls(secondsSinceEpoch=sec, nanoSeconds=nano) def as_datetime(self): 'Timestamp as a datetime' return datetime.datetime.fromtimestamp(self.timestamp) def __iter__(self): return iter((self.secondsSinceEpoch, self.nanoSeconds)) def __eq__(self, other): return tuple(self) == tuple(other) class TimeTypeBase(DbrTypeBase): '''DBR_TIME_* base''' # access to secondsSinceEpoch and nanoSeconds: _anonymous_ = ('stamp', ) info_fields = ('status', 'severity', 'timestamp') @property def timestamp(self): '''Unix timestamp''' return self.stamp.timestamp class StatusTypeBase(DbrTypeBase): '''DBR_STS_* base''' info_fields = ('status', 'severity', ) class GraphicControlBase(DbrTypeBase): '''DBR_CTRL_* and DBR_GR_* base''' graphic_fields: ClassVar[Tuple[str, ...]] = ( "upper_disp_limit", "lower_disp_limit", "upper_alarm_limit", "upper_warning_limit", "lower_warning_limit", "lower_alarm_limit", ) control_fields: ClassVar[Tuple[str, ...]] = ( "upper_ctrl_limit", "lower_ctrl_limit", ) info_fields: ClassVar[Tuple[str, ...]] = ( "status", "severity", ) + graphic_fields class GraphicControlUnits(GraphicControlBase): '''DBR_CTRL/DBR_GR with units''' class ControlTypeUnits(GraphicControlUnits): '''DBR_CTRL with units''' info_fields = (GraphicControlBase.info_fields + GraphicControlBase.control_fields + ('units', )) class GraphicTypeUnits(GraphicControlUnits): '''DBR_GR with units''' info_fields = GraphicControlBase.info_fields + ('units', ) class GraphicControlPrecision(GraphicControlBase): '''DBR_CTRL/DBR_GR with precision and units''' class ControlTypePrecision(GraphicControlPrecision): '''DBR_CTRL with precision and units''' info_fields = (GraphicControlBase.info_fields + GraphicControlBase.control_fields + ('precision', 'units', )) class GraphicTypePrecision(GraphicControlPrecision): '''DBR_GR with precision and units''' info_fields = (GraphicControlBase.info_fields + ('precision', 'units', )) class DbrNativeValueType(DbrTypeBase): '''Native DBR_ types: no metadata, value only''' info_fields = ('value', ) # Native value types class DBR_STRING(DbrNativeValueType): DBR_ID = ChannelType.STRING _fields_ = [('value', string_t)] class DBR_INT(DbrNativeValueType): DBR_ID = ChannelType.INT _fields_ = [('value', int_t)] class DBR_FLOAT(DbrNativeValueType): DBR_ID = ChannelType.FLOAT _fields_ = [('value', float_t)] class DBR_ENUM(DbrNativeValueType): DBR_ID = ChannelType.ENUM _fields_ = [('value', ushort_t)] class DBR_CHAR(DbrNativeValueType): DBR_ID = ChannelType.CHAR _fields_ = [('value', char_t)] class DBR_LONG(DbrNativeValueType): DBR_ID = ChannelType.LONG _fields_ = [('value', long_t)] class DBR_DOUBLE(DbrNativeValueType): DBR_ID = ChannelType.DOUBLE _fields_ = [('value', double_t)] # Status types class DBR_STS_STRING(StatusTypeBase): DBR_ID = ChannelType.STS_STRING _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_INT(StatusTypeBase): DBR_ID = ChannelType.STS_INT _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_FLOAT(StatusTypeBase): DBR_ID = ChannelType.STS_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_ENUM(StatusTypeBase): DBR_ID = ChannelType.STS_ENUM _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_CHAR(StatusTypeBase): DBR_ID = ChannelType.STS_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('RISC_pad', char_t), ] class DBR_STS_LONG(StatusTypeBase): DBR_ID = ChannelType.STS_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_DOUBLE(StatusTypeBase): DBR_ID = ChannelType.STS_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('RISC_pad', long_t), ] # Time types class DBR_TIME_STRING(TimeTypeBase): DBR_ID = ChannelType.TIME_STRING _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ] class DBR_TIME_INT(TimeTypeBase): DBR_ID = ChannelType.TIME_INT _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad', short_t), ] class DBR_TIME_FLOAT(TimeTypeBase): DBR_ID = ChannelType.TIME_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ] class DBR_TIME_ENUM(TimeTypeBase): DBR_ID = ChannelType.TIME_ENUM _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad', short_t), ] class DBR_TIME_CHAR(TimeTypeBase): DBR_ID = ChannelType.TIME_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad0', short_t), ('RISC_pad1', char_t), ] class DBR_TIME_LONG(TimeTypeBase): DBR_ID = ChannelType.TIME_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ] class DBR_TIME_DOUBLE(TimeTypeBase): DBR_ID = ChannelType.TIME_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad', long_t), ] # DBR_GR_STRING (21) is not implemented by EPICS. - use DBR_STS_STRING # Graphic types class DBR_GR_INT(GraphicTypeUnits): DBR_ID = ChannelType.GR_INT _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', short_t), ('lower_disp_limit', short_t), ('upper_alarm_limit', short_t), ('upper_warning_limit', short_t), ('lower_warning_limit', short_t), ('lower_alarm_limit', short_t), ] class DBR_GR_FLOAT(GraphicTypePrecision): DBR_ID = ChannelType.GR_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', float_t), ('lower_disp_limit', float_t), ('upper_alarm_limit', float_t), ('upper_warning_limit', float_t), ('lower_warning_limit', float_t), ('lower_alarm_limit', float_t), ] class _EnumWithStrings: @property def enum_strings(self): '''Enum byte strings as a tuple''' return tuple(self.strs[i].value for i in range(self.no_str)) @enum_strings.setter def enum_strings(self, enum_strings): for i, bytes_ in enumerate(enum_strings): bytes_ = bytes_[:MAX_ENUM_STRING_SIZE - 1] self.strs[i][:] = bytes_.ljust(MAX_ENUM_STRING_SIZE, b'\x00') self.no_str = len(enum_strings) class DBR_GR_ENUM(GraphicControlBase, _EnumWithStrings): DBR_ID = ChannelType.GR_ENUM graphic_fields = () control_fields = () info_fields = ('status', 'severity', 'enum_strings', ) _fields_ = [ ('status', short_t), ('severity', short_t), ('no_str', short_t), # number of strings ('strs', MAX_ENUM_STATES * (MAX_ENUM_STRING_SIZE * char_t)), ] class DBR_GR_CHAR(GraphicTypeUnits): DBR_ID = ChannelType.GR_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', char_t), ('lower_disp_limit', char_t), ('upper_alarm_limit', char_t), ('upper_warning_limit', char_t), ('lower_warning_limit', char_t), ('lower_alarm_limit', char_t), ('RISC_pad', char_t), ] class DBR_GR_LONG(GraphicTypeUnits): DBR_ID = ChannelType.GR_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', long_t), ('lower_disp_limit', long_t), ('upper_alarm_limit', long_t), ('upper_warning_limit', long_t), ('lower_warning_limit', long_t), ('lower_alarm_limit', long_t), ] class DBR_GR_DOUBLE(GraphicTypePrecision): DBR_ID = ChannelType.GR_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', double_t), ('lower_disp_limit', double_t), ('upper_alarm_limit', double_t), ('upper_warning_limit', double_t), ('lower_warning_limit', double_t), ('lower_alarm_limit', double_t), ] # DBR_CTRL_STRING (28) is not implemented by lib # Control types class DBR_CTRL_INT(ControlTypeUnits): DBR_ID = ChannelType.CTRL_INT _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', short_t), ('lower_disp_limit', short_t), ('upper_alarm_limit', short_t), ('upper_warning_limit', short_t), ('lower_warning_limit', short_t), ('lower_alarm_limit', short_t), ('upper_ctrl_limit', short_t), ('lower_ctrl_limit', short_t), ] class DBR_CTRL_FLOAT(ControlTypePrecision): DBR_ID = ChannelType.CTRL_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', float_t), ('lower_disp_limit', float_t), ('upper_alarm_limit', float_t), ('upper_warning_limit', float_t), ('lower_warning_limit', float_t), ('lower_alarm_limit', float_t), ('upper_ctrl_limit', float_t), ('lower_ctrl_limit', float_t), ] class DBR_CTRL_ENUM(GraphicControlBase, _EnumWithStrings): DBR_ID = ChannelType.CTRL_ENUM control_fields = () graphic_fields = () info_fields = ('status', 'severity', 'enum_strings', ) _fields_ = [ ('status', short_t), ('severity', short_t), ('no_str', short_t), # number of strings ('strs', MAX_ENUM_STATES * (MAX_ENUM_STRING_SIZE * char_t)), ] class DBR_CTRL_CHAR(ControlTypeUnits): DBR_ID = ChannelType.CTRL_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', char_t), ('lower_disp_limit', char_t), ('upper_alarm_limit', char_t), ('upper_warning_limit', char_t), ('lower_warning_limit', char_t), ('lower_alarm_limit', char_t), ('upper_ctrl_limit', char_t), ('lower_ctrl_limit', char_t), ('RISC_pad', char_t), ] class DBR_CTRL_LONG(ControlTypeUnits): DBR_ID = ChannelType.CTRL_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', long_t), ('lower_disp_limit', long_t), ('upper_alarm_limit', long_t), ('upper_warning_limit', long_t), ('lower_warning_limit', long_t), ('lower_alarm_limit', long_t), ('upper_ctrl_limit', long_t), ('lower_ctrl_limit', long_t), ] class DBR_CTRL_DOUBLE(ControlTypePrecision): DBR_ID = ChannelType.CTRL_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', double_t), ('lower_disp_limit', double_t), ('upper_alarm_limit', double_t), ('upper_warning_limit', double_t), ('lower_warning_limit', double_t), ('lower_alarm_limit', double_t), ('upper_ctrl_limit', double_t), ('lower_ctrl_limit', double_t), ] # Remaining "special" types class DbrSpecialType(DbrTypeBase): ... class DBR_PUT_ACKT(DbrSpecialType): DBR_ID = ChannelType.PUT_ACKT info_fields = ('value', ) _fields_ = [('value', ushort_t)] class DBR_PUT_ACKS(DbrSpecialType): DBR_ID = ChannelType.PUT_ACKS info_fields = ('value', ) _fields_ = [('value', ushort_t)] class DBR_STSACK_STRING(DbrSpecialType): DBR_ID = ChannelType.STSACK_STRING info_fields = ('status', 'severity', 'ackt', 'acks', 'value') _fields_ = [ ('status', short_t), ('severity', short_t), ('ackt', ushort_t), ('acks', ushort_t), ('value', string_t), ] class DBR_CLASS_NAME(DbrSpecialType): DBR_ID = ChannelType.CLASS_NAME info_fields = ('value', ) _fields_ = [('value', string_t)] # End of DBR type classes # Full mapping of promoted -> native field types, and native -> promoted types field_types = { 'native': { # Native ChannelType.STRING: ChannelType.STRING, ChannelType.INT: ChannelType.INT, ChannelType.FLOAT: ChannelType.FLOAT, ChannelType.ENUM: ChannelType.ENUM, ChannelType.CHAR: ChannelType.CHAR, ChannelType.LONG: ChannelType.LONG, ChannelType.DOUBLE: ChannelType.DOUBLE, # Status ChannelType.STS_STRING: ChannelType.STRING, ChannelType.STS_INT: ChannelType.INT, ChannelType.STS_FLOAT: ChannelType.FLOAT, ChannelType.STS_ENUM: ChannelType.ENUM, ChannelType.STS_CHAR: ChannelType.CHAR, ChannelType.STS_LONG: ChannelType.LONG, ChannelType.STS_DOUBLE: ChannelType.DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.STRING, ChannelType.TIME_INT: ChannelType.INT, ChannelType.TIME_FLOAT: ChannelType.FLOAT, ChannelType.TIME_ENUM: ChannelType.ENUM, ChannelType.TIME_CHAR: ChannelType.CHAR, ChannelType.TIME_LONG: ChannelType.LONG, ChannelType.TIME_DOUBLE: ChannelType.DOUBLE, # Graphic ChannelType.GR_STRING: ChannelType.STRING, ChannelType.GR_INT: ChannelType.INT, ChannelType.GR_FLOAT: ChannelType.FLOAT, ChannelType.GR_ENUM: ChannelType.ENUM, ChannelType.GR_CHAR: ChannelType.CHAR, ChannelType.GR_LONG: ChannelType.LONG, ChannelType.GR_DOUBLE: ChannelType.DOUBLE, # Control ChannelType.CTRL_STRING: ChannelType.STRING, ChannelType.CTRL_INT: ChannelType.INT, ChannelType.CTRL_FLOAT: ChannelType.FLOAT, ChannelType.CTRL_ENUM: ChannelType.ENUM, ChannelType.CTRL_CHAR: ChannelType.CHAR, ChannelType.CTRL_LONG: ChannelType.LONG, ChannelType.CTRL_DOUBLE: ChannelType.DOUBLE, # Special ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'status': { # Native ChannelType.STRING: ChannelType.STS_STRING, ChannelType.INT: ChannelType.STS_INT, ChannelType.FLOAT: ChannelType.STS_FLOAT, ChannelType.ENUM: ChannelType.STS_ENUM, ChannelType.CHAR: ChannelType.STS_CHAR, ChannelType.LONG: ChannelType.STS_LONG, ChannelType.DOUBLE: ChannelType.STS_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.STS_INT: ChannelType.STS_INT, ChannelType.STS_FLOAT: ChannelType.STS_FLOAT, ChannelType.STS_ENUM: ChannelType.STS_ENUM, ChannelType.STS_CHAR: ChannelType.STS_CHAR, ChannelType.STS_LONG: ChannelType.STS_LONG, ChannelType.STS_DOUBLE: ChannelType.STS_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.TIME_INT: ChannelType.STS_INT, ChannelType.TIME_FLOAT: ChannelType.STS_FLOAT, ChannelType.TIME_ENUM: ChannelType.STS_ENUM, ChannelType.TIME_CHAR: ChannelType.STS_CHAR, ChannelType.TIME_LONG: ChannelType.STS_LONG, ChannelType.TIME_DOUBLE: ChannelType.STS_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.GR_INT: ChannelType.STS_INT, ChannelType.GR_FLOAT: ChannelType.STS_FLOAT, ChannelType.GR_ENUM: ChannelType.STS_ENUM, ChannelType.GR_CHAR: ChannelType.STS_CHAR, ChannelType.GR_LONG: ChannelType.STS_LONG, ChannelType.GR_DOUBLE: ChannelType.STS_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.CTRL_INT: ChannelType.STS_INT, ChannelType.CTRL_FLOAT: ChannelType.STS_FLOAT, ChannelType.CTRL_ENUM: ChannelType.STS_ENUM, ChannelType.CTRL_CHAR: ChannelType.STS_CHAR, ChannelType.CTRL_LONG: ChannelType.STS_LONG, ChannelType.CTRL_DOUBLE: ChannelType.STS_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'time': { # Native ChannelType.STRING: ChannelType.TIME_STRING, ChannelType.INT: ChannelType.TIME_INT, ChannelType.FLOAT: ChannelType.TIME_FLOAT, ChannelType.ENUM: ChannelType.TIME_ENUM, ChannelType.CHAR: ChannelType.TIME_CHAR, ChannelType.LONG: ChannelType.TIME_LONG, ChannelType.DOUBLE: ChannelType.TIME_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.STS_INT: ChannelType.TIME_INT, ChannelType.STS_FLOAT: ChannelType.TIME_FLOAT, ChannelType.STS_ENUM: ChannelType.TIME_ENUM, ChannelType.STS_CHAR: ChannelType.TIME_CHAR, ChannelType.STS_LONG: ChannelType.TIME_LONG, ChannelType.STS_DOUBLE: ChannelType.TIME_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.TIME_INT: ChannelType.TIME_INT, ChannelType.TIME_FLOAT: ChannelType.TIME_FLOAT, ChannelType.TIME_ENUM: ChannelType.TIME_ENUM, ChannelType.TIME_CHAR: ChannelType.TIME_CHAR, ChannelType.TIME_LONG: ChannelType.TIME_LONG, ChannelType.TIME_DOUBLE: ChannelType.TIME_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.GR_INT: ChannelType.TIME_INT, ChannelType.GR_FLOAT: ChannelType.TIME_FLOAT, ChannelType.GR_ENUM: ChannelType.TIME_ENUM, ChannelType.GR_CHAR: ChannelType.TIME_CHAR, ChannelType.GR_LONG: ChannelType.TIME_LONG, ChannelType.GR_DOUBLE: ChannelType.TIME_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.CTRL_INT: ChannelType.TIME_INT, ChannelType.CTRL_FLOAT: ChannelType.TIME_FLOAT, ChannelType.CTRL_ENUM: ChannelType.TIME_ENUM, ChannelType.CTRL_CHAR: ChannelType.TIME_CHAR, ChannelType.CTRL_LONG: ChannelType.TIME_LONG, ChannelType.CTRL_DOUBLE: ChannelType.TIME_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'graphic': { # Native ChannelType.STRING: ChannelType.STS_STRING, ChannelType.INT: ChannelType.GR_INT, ChannelType.FLOAT: ChannelType.GR_FLOAT, ChannelType.ENUM: ChannelType.GR_ENUM, ChannelType.CHAR: ChannelType.GR_CHAR, ChannelType.LONG: ChannelType.GR_LONG, ChannelType.DOUBLE: ChannelType.GR_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.STS_INT: ChannelType.GR_INT, ChannelType.STS_FLOAT: ChannelType.GR_FLOAT, ChannelType.STS_ENUM: ChannelType.GR_ENUM, ChannelType.STS_CHAR: ChannelType.GR_CHAR, ChannelType.STS_LONG: ChannelType.GR_LONG, ChannelType.STS_DOUBLE: ChannelType.GR_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.TIME_INT: ChannelType.GR_INT, ChannelType.TIME_FLOAT: ChannelType.GR_FLOAT, ChannelType.TIME_ENUM: ChannelType.GR_ENUM, ChannelType.TIME_CHAR: ChannelType.GR_CHAR, ChannelType.TIME_LONG: ChannelType.GR_LONG, ChannelType.TIME_DOUBLE: ChannelType.GR_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.GR_INT: ChannelType.GR_INT, ChannelType.GR_FLOAT: ChannelType.GR_FLOAT, ChannelType.GR_ENUM: ChannelType.GR_ENUM, ChannelType.GR_CHAR: ChannelType.GR_CHAR, ChannelType.GR_LONG: ChannelType.GR_LONG, ChannelType.GR_DOUBLE: ChannelType.GR_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.CTRL_INT: ChannelType.GR_INT, ChannelType.CTRL_FLOAT: ChannelType.GR_FLOAT, ChannelType.CTRL_ENUM: ChannelType.GR_ENUM, ChannelType.CTRL_CHAR: ChannelType.GR_CHAR, ChannelType.CTRL_LONG: ChannelType.GR_LONG, ChannelType.CTRL_DOUBLE: ChannelType.GR_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'control': { # Native ChannelType.STRING: ChannelType.TIME_STRING, ChannelType.INT: ChannelType.CTRL_INT, ChannelType.FLOAT: ChannelType.CTRL_FLOAT, ChannelType.ENUM: ChannelType.CTRL_ENUM, ChannelType.CHAR: ChannelType.CTRL_CHAR, ChannelType.LONG: ChannelType.CTRL_LONG, ChannelType.DOUBLE: ChannelType.CTRL_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.STS_INT: ChannelType.CTRL_INT, ChannelType.STS_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.STS_ENUM: ChannelType.CTRL_ENUM, ChannelType.STS_CHAR: ChannelType.CTRL_CHAR, ChannelType.STS_LONG: ChannelType.CTRL_LONG, ChannelType.STS_DOUBLE: ChannelType.CTRL_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.TIME_INT: ChannelType.CTRL_INT, ChannelType.TIME_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.TIME_ENUM: ChannelType.CTRL_ENUM, ChannelType.TIME_CHAR: ChannelType.CTRL_CHAR, ChannelType.TIME_LONG: ChannelType.CTRL_LONG, ChannelType.TIME_DOUBLE: ChannelType.CTRL_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.GR_INT: ChannelType.CTRL_INT, ChannelType.GR_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.GR_ENUM: ChannelType.CTRL_ENUM, ChannelType.GR_CHAR: ChannelType.CTRL_CHAR, ChannelType.GR_LONG: ChannelType.CTRL_LONG, ChannelType.GR_DOUBLE: ChannelType.CTRL_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.CTRL_INT: ChannelType.CTRL_INT, ChannelType.CTRL_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.CTRL_ENUM: ChannelType.CTRL_ENUM, ChannelType.CTRL_CHAR: ChannelType.CTRL_CHAR, ChannelType.CTRL_LONG: ChannelType.CTRL_LONG, ChannelType.CTRL_DOUBLE: ChannelType.CTRL_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, } # All native types available native_types = {ChannelType.STRING, ChannelType.INT, ChannelType.FLOAT, ChannelType.ENUM, ChannelType.CHAR, ChannelType.LONG, ChannelType.DOUBLE} # Special types without any corresponding promoted versions special_types = {ChannelType.PUT_ACKT, ChannelType.PUT_ACKS, ChannelType.STSACK_STRING, ChannelType.CLASS_NAME} # ChannelTypes grouped by included metadata status_types = set(field_types['status'].values()) - set(special_types) time_types = set(field_types['time'].values()) - set(special_types) graphical_types = (set(field_types['graphic'].values()) - set(special_types) - {ChannelType.STS_STRING}) control_types = (set(field_types['control'].values()) - set(special_types) - {ChannelType.TIME_STRING}) # ChannelTypes grouped by value data type char_types = {ChannelType.CHAR, ChannelType.TIME_CHAR, ChannelType.CTRL_CHAR, ChannelType.STS_CHAR } string_types = {ChannelType.STRING, ChannelType.TIME_STRING, ChannelType.CTRL_STRING, ChannelType.STS_STRING } int_types = {ChannelType.INT, ChannelType.TIME_INT, ChannelType.CTRL_INT, ChannelType.LONG, ChannelType.TIME_LONG, ChannelType.CTRL_LONG, } float_types = {ChannelType.FLOAT, ChannelType.TIME_FLOAT, ChannelType.CTRL_FLOAT, ChannelType.DOUBLE, ChannelType.TIME_DOUBLE, ChannelType.CTRL_DOUBLE, } enum_types = {ChannelType.ENUM, ChannelType.STS_ENUM, ChannelType.TIME_ENUM, ChannelType.CTRL_ENUM } char_types = {ChannelType.CHAR, ChannelType.TIME_CHAR, ChannelType.CTRL_CHAR} native_float_types = {ChannelType.FLOAT, ChannelType.DOUBLE} native_int_types = {ChannelType.INT, ChannelType.CHAR, ChannelType.LONG, ChannelType.ENUM } # map of Epics DBR types to ctypes types DBR_TYPES = { cls.DBR_ID: cls for name, cls in globals().items() if (name.startswith('DBR_') and issubclass(cls, DbrTypeBase) and hasattr(cls, 'DBR_ID')) } # Unimplemented STRING types are mapped to DBR_TIME_STRING DBR_TYPES[ChannelType.GR_STRING] = DBR_STS_STRING DBR_TYPES[ChannelType.CTRL_STRING] = DBR_TIME_STRING def native_type(ftype): '''return native field type from TIME or CTRL variant''' return field_types['native'][ftype] def epics_timestamp_to_unix(seconds_since_epoch, nano_seconds): '''UNIX timestamp (seconds) from Epics TimeStamp structure''' return (EPICS2UNIX_EPOCH + seconds_since_epoch + 1.e-6 * int(1.e-3 * nano_seconds)) def timestamp_to_epics(ts): '''Python timestamp from EPICS TimeStamp structure''' if isinstance(ts, numbers.Real): ts = datetime.datetime.utcfromtimestamp(ts) dt = ts - EPICS_EPOCH return int(dt.total_seconds()), int(dt.microseconds * 1e3)