Source code for caproto._dbr

# Manually written with reference to:
# http://www.aps.anl.gov/epics/base/R3-16/0-docs/CAproto/index.html#payload-data-types
# https://github.com/epics-base/epics-base/blob/813166128eae1240cdd643869808abe1c4621321/src/ca/client/db_access.h

# The organizational code, making use of Enum, comes from pypvasync by Kenneth
# Lauer.

import ctypes
import datetime
import collections
from enum import IntEnum, IntFlag
from ._constants import (EPICS2UNIX_EPOCH, EPICS_EPOCH, MAX_STRING_SIZE,
                         MAX_UNITS_SIZE, MAX_ENUM_STRING_SIZE, MAX_ENUM_STATES)


__all__ = ('AccessRights', 'AlarmSeverity', 'AlarmStatus', 'ConnStatus',
           'TimeStamp', 'ChannelType', 'SubscriptionType', 'DbrStringArray',
           'epics_timestamp_to_unix', 'timestamp_to_epics',
           'field_types', 'DBR_TYPES', 'native_type', 'native_types',
           'status_types', 'time_types', 'graphical_types', 'control_types',
           'char_types', 'string_types', 'int_types', 'float_types',
           'enum_types', 'char_types', 'native_float_types',
           'native_int_types')


class AccessRights(IntFlag):
    NO_ACCESS = 0
    READ = 1
    WRITE = 2


class AlarmSeverity(IntEnum):
    NO_ALARM = 0
    MINOR_ALARM = 1
    MAJOR_ALARM = 2
    INVALID_ALARM = 3


class AlarmStatus(IntEnum):
    NO_ALARM = 0
    READ = 1
    WRITE = 2
    HIHI = 3
    HIGH = 4
    LOLO = 5
    LOW = 6
    STATE = 7
    COS = 8
    COMM = 9
    TIMEOUT = 10
    HWLIMIT = 11
    CALC = 12
    SCAN = 13
    LINK = 14
    SOFT = 15
    BAD_SUB = 16
    UDF = 17
    DISABLE = 18
    SIMM = 19
    READ_ACCESS = 20
    WRITE_ACCESS = 21


# EPICS Constants
class ECA(IntEnum):
    NORMAL = 1
    TIMEOUT = 80
    IODONE = 339
    ISATTACHED = 424
    BADCHID = 410


class ConnStatus(IntEnum):
    CS_CONN = 2
    OP_CONN_UP = 6
    OP_CONN_DOWN = 7
    CS_NEVER_SEARCH = 4


[docs]class ChannelType(IntEnum): STRING = 0 INT = 1 FLOAT = 2 ENUM = 3 CHAR = 4 LONG = 5 DOUBLE = 6 STS_STRING = 7 STS_INT = 8 STS_FLOAT = 9 STS_ENUM = 10 STS_CHAR = 11 STS_LONG = 12 STS_DOUBLE = 13 TIME_STRING = 14 TIME_INT = 15 TIME_FLOAT = 16 TIME_ENUM = 17 TIME_CHAR = 18 TIME_LONG = 19 TIME_DOUBLE = 20 GR_STRING = 21 # not implemented by EPICS GR_INT = 22 GR_FLOAT = 23 GR_ENUM = 24 GR_CHAR = 25 GR_LONG = 26 GR_DOUBLE = 27 CTRL_STRING = 28 # not implemented by EPICS CTRL_INT = 29 CTRL_FLOAT = 30 CTRL_ENUM = 31 CTRL_CHAR = 32 CTRL_LONG = 33 CTRL_DOUBLE = 34 PUT_ACKT = 35 PUT_ACKS = 36 STSACK_STRING = 37 CLASS_NAME = 38
class SubscriptionType(IntFlag): '''Subscription masks DBE_VALUE Trigger an event when a significant change in the channel's value occurs. (In epics-base, relies on the monitor deadband field under DCT.) DBE_ARCHIVE (DBE_LOG) Trigger an event when an archive significant change in the channel's value occurs. (In epics-base, relies on the archiver monitor deadband field under DCT.) DBE_ALARM Trigger an event when the alarm state changes DBE_PROPERTY Trigger an event when a property change (control limit, graphical limit, status string, enum string ...) occurs. ''' DBE_VALUE = 1 DBE_LOG = 2 DBE_ALARM = 4 DBE_PROPERTY = 8 string_t = MAX_STRING_SIZE * ctypes.c_char # epicsOldString char_t = ctypes.c_char # epicsUint8 short_t = ctypes.c_int16 # epicsInt16 ushort_t = ctypes.c_uint16 # epicsUInt16 int_t = ctypes.c_int16 # epicsInt16 long_t = ctypes.c_int32 # epicsInt32 ulong_t = ctypes.c_uint32 # epicsUInt32 float_t = ctypes.c_float # epicsFloat32 double_t = ctypes.c_double # epicsFloat64 class DbrStringArray(collections.UserList): '''A mockup of numpy.array, intended to hold byte strings String arrays in numpy are special and inconvenient to work with. ''' def __getitem__(self, i): res = self.data[i] return type(self)(res) if isinstance(i, slice) else res @classmethod def frombuffer(cls, buf, data_count=None): 'Create a DbrStringArray from a buffer' if data_count is None: data_count = max((1, len(buf) // MAX_STRING_SIZE)) def safely_find_eos(): 'Find null terminator, else MAX_STRING_SIZE/length of the string' try: return min((MAX_STRING_SIZE, buf.index(b'\00'))) except ValueError: return min((MAX_STRING_SIZE, len(buf))) buf = bytes(buf) strings = cls() for i in range(data_count): strings.append(buf[:safely_find_eos()]) buf = buf[MAX_STRING_SIZE:] return strings def tobytes(self): # numpy compat return b''.join(item[:MAX_STRING_SIZE].ljust(MAX_STRING_SIZE, b'\x00') for item in self) class DbrTypeBase(ctypes.BigEndianStructure): '''Base class for all DBR types''' _pack_ = 1 info_fields = () def to_dict(self): d = {field: getattr(self, field) for field in self.info_fields} if 'status' in d: d['status'] = AlarmStatus(d['status']) if 'severity' in d: d['severity'] = AlarmSeverity(d['severity']) return d def __repr__(self): formatted_args = ", ".join(["{!s}={!r}".format(k, v) for k, v in self.to_dict().items()]) return "{}({})".format(type(self).__name__, formatted_args) class TimeStamp(DbrTypeBase): '''An EPICS timestamp with 32-bit seconds and nanoseconds''' _fields_ = [ ('secondsSinceEpoch', ctypes.c_uint32), ('nanoSeconds', ctypes.c_uint32) ] info_fields = ('timestamp', ) @property def timestamp(self): 'Timestamp as UNIX timestamp (seconds)' return epics_timestamp_to_unix(self.secondsSinceEpoch, self.nanoSeconds) @classmethod def from_unix_timestamp(cls, timestamp): sec, nano = timestamp_to_epics(timestamp) return cls(secondsSinceEpoch=sec, nanoSeconds=nano) def as_datetime(self): 'Timestamp as a datetime' return datetime.datetime.utcfromtimestamp(self.timestamp) class TimeTypeBase(DbrTypeBase): '''DBR_TIME_* base''' # access to secondsSinceEpoch and nanoSeconds: _anonymous_ = ('stamp', ) info_fields = ('status', 'severity', 'timestamp') @property def timestamp(self): '''Unix timestamp''' return self.stamp.timestamp class StatusTypeBase(DbrTypeBase): '''DBR_STS_* base''' info_fields = ('status', 'severity', ) class GraphicControlBase(DbrTypeBase): '''DBR_CTRL_* and DBR_GR_* base''' graphic_fields = ('upper_disp_limit', 'lower_disp_limit', 'upper_alarm_limit', 'upper_warning_limit', 'lower_warning_limit', 'lower_alarm_limit') control_fields = ('upper_ctrl_limit', 'lower_ctrl_limit') info_fields = ('status', 'severity', ) + graphic_fields class GraphicControlUnits(GraphicControlBase): '''DBR_CTRL/DBR_GR with units''' class ControlTypeUnits(GraphicControlUnits): '''DBR_CTRL with units''' info_fields = (GraphicControlBase.info_fields + GraphicControlBase.control_fields + ('units', )) class GraphicTypeUnits(GraphicControlUnits): '''DBR_GR with units''' info_fields = GraphicControlBase.info_fields + ('units', ) class GraphicControlPrecision(GraphicControlBase): '''DBR_CTRL/DBR_GR with precision and units''' class ControlTypePrecision(GraphicControlPrecision): '''DBR_CTRL with precision and units''' info_fields = (GraphicControlBase.info_fields + GraphicControlBase.control_fields + ('precision', 'units', )) class GraphicTypePrecision(GraphicControlPrecision): '''DBR_GR with precision and units''' info_fields = (GraphicControlBase.info_fields + ('precision', 'units', )) class DbrNativeValueType(DbrTypeBase): '''Native DBR_ types: no metadata, value only''' info_fields = ('value', ) # Native value types class DBR_STRING(DbrNativeValueType): DBR_ID = ChannelType.STRING _fields_ = [('value', string_t)] class DBR_INT(DbrNativeValueType): DBR_ID = ChannelType.INT _fields_ = [('value', int_t)] class DBR_FLOAT(DbrNativeValueType): DBR_ID = ChannelType.FLOAT _fields_ = [('value', float_t)] class DBR_ENUM(DbrNativeValueType): DBR_ID = ChannelType.ENUM _fields_ = [('value', ushort_t)] class DBR_CHAR(DbrNativeValueType): DBR_ID = ChannelType.CHAR _fields_ = [('value', char_t)] class DBR_LONG(DbrNativeValueType): DBR_ID = ChannelType.LONG _fields_ = [('value', long_t)] class DBR_DOUBLE(DbrNativeValueType): DBR_ID = ChannelType.DOUBLE _fields_ = [('value', double_t)] # Status types class DBR_STS_STRING(StatusTypeBase): DBR_ID = ChannelType.STS_STRING _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_INT(StatusTypeBase): DBR_ID = ChannelType.STS_INT _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_FLOAT(StatusTypeBase): DBR_ID = ChannelType.STS_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_ENUM(StatusTypeBase): DBR_ID = ChannelType.STS_ENUM _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_CHAR(StatusTypeBase): DBR_ID = ChannelType.STS_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('RISC_pad', char_t), ] class DBR_STS_LONG(StatusTypeBase): DBR_ID = ChannelType.STS_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ] class DBR_STS_DOUBLE(StatusTypeBase): DBR_ID = ChannelType.STS_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('RISC_pad', long_t), ] # Time types class DBR_TIME_STRING(TimeTypeBase): DBR_ID = ChannelType.TIME_STRING _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ] class DBR_TIME_INT(TimeTypeBase): DBR_ID = ChannelType.TIME_INT _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad', short_t), ] class DBR_TIME_FLOAT(TimeTypeBase): DBR_ID = ChannelType.TIME_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ] class DBR_TIME_ENUM(TimeTypeBase): DBR_ID = ChannelType.TIME_ENUM _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad', short_t), ] class DBR_TIME_CHAR(TimeTypeBase): DBR_ID = ChannelType.TIME_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad0', short_t), ('RISC_pad1', char_t), ] class DBR_TIME_LONG(TimeTypeBase): DBR_ID = ChannelType.TIME_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ] class DBR_TIME_DOUBLE(TimeTypeBase): DBR_ID = ChannelType.TIME_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('stamp', TimeStamp), ('RISC_pad', long_t), ] # DBR_GR_STRING (21) is not implemented by EPICS. - use DBR_STS_STRING # Graphic types class DBR_GR_INT(GraphicTypeUnits): DBR_ID = ChannelType.GR_INT _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', short_t), ('lower_disp_limit', short_t), ('upper_alarm_limit', short_t), ('upper_warning_limit', short_t), ('lower_warning_limit', short_t), ('lower_alarm_limit', short_t), ] class DBR_GR_FLOAT(GraphicTypePrecision): DBR_ID = ChannelType.GR_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', float_t), ('lower_disp_limit', float_t), ('upper_alarm_limit', float_t), ('upper_warning_limit', float_t), ('lower_warning_limit', float_t), ('lower_alarm_limit', float_t), ] class _EnumWithStrings: @property def enum_strings(self): '''Enum byte strings as a tuple''' return tuple(self.strs[i].value for i in range(self.no_str)) @enum_strings.setter def enum_strings(self, enum_strings): for i, bytes_ in enumerate(enum_strings): bytes_ = bytes_[:MAX_ENUM_STRING_SIZE - 1] self.strs[i][:] = bytes_.ljust(MAX_ENUM_STRING_SIZE, b'\x00') self.no_str = len(enum_strings) class DBR_GR_ENUM(GraphicControlBase, _EnumWithStrings): DBR_ID = ChannelType.GR_ENUM graphic_fields = () control_fields = () info_fields = ('status', 'severity', 'enum_strings', ) _fields_ = [ ('status', short_t), ('severity', short_t), ('no_str', short_t), # number of strings ('strs', MAX_ENUM_STATES * (MAX_ENUM_STRING_SIZE * char_t)), ] class DBR_GR_CHAR(GraphicTypeUnits): DBR_ID = ChannelType.GR_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', char_t), ('lower_disp_limit', char_t), ('upper_alarm_limit', char_t), ('upper_warning_limit', char_t), ('lower_warning_limit', char_t), ('lower_alarm_limit', char_t), ('RISC_pad', char_t), ] class DBR_GR_LONG(GraphicTypeUnits): DBR_ID = ChannelType.GR_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', long_t), ('lower_disp_limit', long_t), ('upper_alarm_limit', long_t), ('upper_warning_limit', long_t), ('lower_warning_limit', long_t), ('lower_alarm_limit', long_t), ] class DBR_GR_DOUBLE(GraphicTypePrecision): DBR_ID = ChannelType.GR_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', double_t), ('lower_disp_limit', double_t), ('upper_alarm_limit', double_t), ('upper_warning_limit', double_t), ('lower_warning_limit', double_t), ('lower_alarm_limit', double_t), ] # DBR_CTRL_STRING (28) is not implemented by lib # Control types class DBR_CTRL_INT(ControlTypeUnits): DBR_ID = ChannelType.CTRL_INT _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', short_t), ('lower_disp_limit', short_t), ('upper_alarm_limit', short_t), ('upper_warning_limit', short_t), ('lower_warning_limit', short_t), ('lower_alarm_limit', short_t), ('upper_ctrl_limit', short_t), ('lower_ctrl_limit', short_t), ] class DBR_CTRL_FLOAT(ControlTypePrecision): DBR_ID = ChannelType.CTRL_FLOAT _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', float_t), ('lower_disp_limit', float_t), ('upper_alarm_limit', float_t), ('upper_warning_limit', float_t), ('lower_warning_limit', float_t), ('lower_alarm_limit', float_t), ('upper_ctrl_limit', float_t), ('lower_ctrl_limit', float_t), ] class DBR_CTRL_ENUM(GraphicControlBase, _EnumWithStrings): DBR_ID = ChannelType.CTRL_ENUM control_fields = () graphic_fields = () info_fields = ('status', 'severity', 'enum_strings', ) _fields_ = [ ('status', short_t), ('severity', short_t), ('no_str', short_t), # number of strings ('strs', MAX_ENUM_STATES * (MAX_ENUM_STRING_SIZE * char_t)), ] @property def enum_strings(self): '''Enum byte strings as a tuple''' return tuple(self.strs[i].value for i in range(self.no_str)) @enum_strings.setter def enum_strings(self, enum_strings): for i, bytes_ in enumerate(enum_strings): bytes_ = bytes_[:MAX_ENUM_STRING_SIZE - 1] self.strs[i][:] = bytes_.ljust(MAX_ENUM_STRING_SIZE, b'\x00') self.no_str = len(enum_strings) class DBR_CTRL_CHAR(ControlTypeUnits): DBR_ID = ChannelType.CTRL_CHAR _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', char_t), ('lower_disp_limit', char_t), ('upper_alarm_limit', char_t), ('upper_warning_limit', char_t), ('lower_warning_limit', char_t), ('lower_alarm_limit', char_t), ('upper_ctrl_limit', char_t), ('lower_ctrl_limit', char_t), ('RISC_pad', char_t), ] class DBR_CTRL_LONG(ControlTypeUnits): DBR_ID = ChannelType.CTRL_LONG _fields_ = [ ('status', short_t), ('severity', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', long_t), ('lower_disp_limit', long_t), ('upper_alarm_limit', long_t), ('upper_warning_limit', long_t), ('lower_warning_limit', long_t), ('lower_alarm_limit', long_t), ('upper_ctrl_limit', long_t), ('lower_ctrl_limit', long_t), ] class DBR_CTRL_DOUBLE(ControlTypePrecision): DBR_ID = ChannelType.CTRL_DOUBLE _fields_ = [ ('status', short_t), ('severity', short_t), ('precision', short_t), ('RISC_pad0', short_t), ('units', MAX_UNITS_SIZE * char_t), ('upper_disp_limit', double_t), ('lower_disp_limit', double_t), ('upper_alarm_limit', double_t), ('upper_warning_limit', double_t), ('lower_warning_limit', double_t), ('lower_alarm_limit', double_t), ('upper_ctrl_limit', double_t), ('lower_ctrl_limit', double_t), ] # Remaining "special" types class DbrSpecialType(DbrTypeBase): ... class DBR_PUT_ACKT(DbrSpecialType): DBR_ID = ChannelType.PUT_ACKT info_fields = ('value', ) _fields_ = [('value', ushort_t)] class DBR_PUT_ACKS(DbrSpecialType): DBR_ID = ChannelType.PUT_ACKS info_fields = ('value', ) _fields_ = [('value', ushort_t)] class DBR_STSACK_STRING(DbrSpecialType): DBR_ID = ChannelType.STSACK_STRING info_fields = ('status', 'severity', 'ackt', 'acks', 'value') _fields_ = [ ('status', short_t), ('severity', short_t), ('ackt', ushort_t), ('acks', ushort_t), ('value', string_t), ] class DBR_CLASS_NAME(DbrSpecialType): DBR_ID = ChannelType.CLASS_NAME info_fields = ('value', ) _fields_ = [('value', string_t)] # End of DBR type classes # Full mapping of promoted -> native field types, and native -> promoted types field_types = { 'native': { # Native ChannelType.STRING: ChannelType.STRING, ChannelType.INT: ChannelType.INT, ChannelType.FLOAT: ChannelType.FLOAT, ChannelType.ENUM: ChannelType.ENUM, ChannelType.CHAR: ChannelType.CHAR, ChannelType.LONG: ChannelType.LONG, ChannelType.DOUBLE: ChannelType.DOUBLE, # Status ChannelType.STS_STRING: ChannelType.STRING, ChannelType.STS_INT: ChannelType.INT, ChannelType.STS_FLOAT: ChannelType.FLOAT, ChannelType.STS_ENUM: ChannelType.ENUM, ChannelType.STS_CHAR: ChannelType.CHAR, ChannelType.STS_LONG: ChannelType.LONG, ChannelType.STS_DOUBLE: ChannelType.DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.STRING, ChannelType.TIME_INT: ChannelType.INT, ChannelType.TIME_FLOAT: ChannelType.FLOAT, ChannelType.TIME_ENUM: ChannelType.ENUM, ChannelType.TIME_CHAR: ChannelType.CHAR, ChannelType.TIME_LONG: ChannelType.LONG, ChannelType.TIME_DOUBLE: ChannelType.DOUBLE, # Graphic ChannelType.GR_STRING: ChannelType.STRING, ChannelType.GR_INT: ChannelType.INT, ChannelType.GR_FLOAT: ChannelType.FLOAT, ChannelType.GR_ENUM: ChannelType.ENUM, ChannelType.GR_CHAR: ChannelType.CHAR, ChannelType.GR_LONG: ChannelType.LONG, ChannelType.GR_DOUBLE: ChannelType.DOUBLE, # Control ChannelType.CTRL_STRING: ChannelType.STRING, ChannelType.CTRL_INT: ChannelType.INT, ChannelType.CTRL_FLOAT: ChannelType.FLOAT, ChannelType.CTRL_ENUM: ChannelType.ENUM, ChannelType.CTRL_CHAR: ChannelType.CHAR, ChannelType.CTRL_LONG: ChannelType.LONG, ChannelType.CTRL_DOUBLE: ChannelType.DOUBLE, # Special ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'status': { # Native ChannelType.STRING: ChannelType.STS_STRING, ChannelType.INT: ChannelType.STS_INT, ChannelType.FLOAT: ChannelType.STS_FLOAT, ChannelType.ENUM: ChannelType.STS_ENUM, ChannelType.CHAR: ChannelType.STS_CHAR, ChannelType.LONG: ChannelType.STS_LONG, ChannelType.DOUBLE: ChannelType.STS_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.STS_INT: ChannelType.STS_INT, ChannelType.STS_FLOAT: ChannelType.STS_FLOAT, ChannelType.STS_ENUM: ChannelType.STS_ENUM, ChannelType.STS_CHAR: ChannelType.STS_CHAR, ChannelType.STS_LONG: ChannelType.STS_LONG, ChannelType.STS_DOUBLE: ChannelType.STS_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.TIME_INT: ChannelType.STS_INT, ChannelType.TIME_FLOAT: ChannelType.STS_FLOAT, ChannelType.TIME_ENUM: ChannelType.STS_ENUM, ChannelType.TIME_CHAR: ChannelType.STS_CHAR, ChannelType.TIME_LONG: ChannelType.STS_LONG, ChannelType.TIME_DOUBLE: ChannelType.STS_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.GR_INT: ChannelType.STS_INT, ChannelType.GR_FLOAT: ChannelType.STS_FLOAT, ChannelType.GR_ENUM: ChannelType.STS_ENUM, ChannelType.GR_CHAR: ChannelType.STS_CHAR, ChannelType.GR_LONG: ChannelType.STS_LONG, ChannelType.GR_DOUBLE: ChannelType.STS_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.CTRL_INT: ChannelType.STS_INT, ChannelType.CTRL_FLOAT: ChannelType.STS_FLOAT, ChannelType.CTRL_ENUM: ChannelType.STS_ENUM, ChannelType.CTRL_CHAR: ChannelType.STS_CHAR, ChannelType.CTRL_LONG: ChannelType.STS_LONG, ChannelType.CTRL_DOUBLE: ChannelType.STS_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'time': { # Native ChannelType.STRING: ChannelType.TIME_STRING, ChannelType.INT: ChannelType.TIME_INT, ChannelType.FLOAT: ChannelType.TIME_FLOAT, ChannelType.ENUM: ChannelType.TIME_ENUM, ChannelType.CHAR: ChannelType.TIME_CHAR, ChannelType.LONG: ChannelType.TIME_LONG, ChannelType.DOUBLE: ChannelType.TIME_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.STS_INT: ChannelType.TIME_INT, ChannelType.STS_FLOAT: ChannelType.TIME_FLOAT, ChannelType.STS_ENUM: ChannelType.TIME_ENUM, ChannelType.STS_CHAR: ChannelType.TIME_CHAR, ChannelType.STS_LONG: ChannelType.TIME_LONG, ChannelType.STS_DOUBLE: ChannelType.TIME_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.TIME_INT: ChannelType.TIME_INT, ChannelType.TIME_FLOAT: ChannelType.TIME_FLOAT, ChannelType.TIME_ENUM: ChannelType.TIME_ENUM, ChannelType.TIME_CHAR: ChannelType.TIME_CHAR, ChannelType.TIME_LONG: ChannelType.TIME_LONG, ChannelType.TIME_DOUBLE: ChannelType.TIME_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.GR_INT: ChannelType.TIME_INT, ChannelType.GR_FLOAT: ChannelType.TIME_FLOAT, ChannelType.GR_ENUM: ChannelType.TIME_ENUM, ChannelType.GR_CHAR: ChannelType.TIME_CHAR, ChannelType.GR_LONG: ChannelType.TIME_LONG, ChannelType.GR_DOUBLE: ChannelType.TIME_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.CTRL_INT: ChannelType.TIME_INT, ChannelType.CTRL_FLOAT: ChannelType.TIME_FLOAT, ChannelType.CTRL_ENUM: ChannelType.TIME_ENUM, ChannelType.CTRL_CHAR: ChannelType.TIME_CHAR, ChannelType.CTRL_LONG: ChannelType.TIME_LONG, ChannelType.CTRL_DOUBLE: ChannelType.TIME_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'graphic': { # Native ChannelType.STRING: ChannelType.STS_STRING, ChannelType.INT: ChannelType.GR_INT, ChannelType.FLOAT: ChannelType.GR_FLOAT, ChannelType.ENUM: ChannelType.GR_ENUM, ChannelType.CHAR: ChannelType.GR_CHAR, ChannelType.LONG: ChannelType.GR_LONG, ChannelType.DOUBLE: ChannelType.GR_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.STS_INT: ChannelType.GR_INT, ChannelType.STS_FLOAT: ChannelType.GR_FLOAT, ChannelType.STS_ENUM: ChannelType.GR_ENUM, ChannelType.STS_CHAR: ChannelType.GR_CHAR, ChannelType.STS_LONG: ChannelType.GR_LONG, ChannelType.STS_DOUBLE: ChannelType.GR_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.TIME_INT: ChannelType.GR_INT, ChannelType.TIME_FLOAT: ChannelType.GR_FLOAT, ChannelType.TIME_ENUM: ChannelType.GR_ENUM, ChannelType.TIME_CHAR: ChannelType.GR_CHAR, ChannelType.TIME_LONG: ChannelType.GR_LONG, ChannelType.TIME_DOUBLE: ChannelType.GR_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.STS_STRING, ChannelType.GR_INT: ChannelType.GR_INT, ChannelType.GR_FLOAT: ChannelType.GR_FLOAT, ChannelType.GR_ENUM: ChannelType.GR_ENUM, ChannelType.GR_CHAR: ChannelType.GR_CHAR, ChannelType.GR_LONG: ChannelType.GR_LONG, ChannelType.GR_DOUBLE: ChannelType.GR_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.STS_STRING, ChannelType.CTRL_INT: ChannelType.GR_INT, ChannelType.CTRL_FLOAT: ChannelType.GR_FLOAT, ChannelType.CTRL_ENUM: ChannelType.GR_ENUM, ChannelType.CTRL_CHAR: ChannelType.GR_CHAR, ChannelType.CTRL_LONG: ChannelType.GR_LONG, ChannelType.CTRL_DOUBLE: ChannelType.GR_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, 'control': { # Native ChannelType.STRING: ChannelType.TIME_STRING, ChannelType.INT: ChannelType.CTRL_INT, ChannelType.FLOAT: ChannelType.CTRL_FLOAT, ChannelType.ENUM: ChannelType.CTRL_ENUM, ChannelType.CHAR: ChannelType.CTRL_CHAR, ChannelType.LONG: ChannelType.CTRL_LONG, ChannelType.DOUBLE: ChannelType.CTRL_DOUBLE, # Status ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.STS_INT: ChannelType.CTRL_INT, ChannelType.STS_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.STS_ENUM: ChannelType.CTRL_ENUM, ChannelType.STS_CHAR: ChannelType.CTRL_CHAR, ChannelType.STS_LONG: ChannelType.CTRL_LONG, ChannelType.STS_DOUBLE: ChannelType.CTRL_DOUBLE, # Time ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.TIME_INT: ChannelType.CTRL_INT, ChannelType.TIME_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.TIME_ENUM: ChannelType.CTRL_ENUM, ChannelType.TIME_CHAR: ChannelType.CTRL_CHAR, ChannelType.TIME_LONG: ChannelType.CTRL_LONG, ChannelType.TIME_DOUBLE: ChannelType.CTRL_DOUBLE, # Graphic ChannelType.STS_STRING: ChannelType.TIME_STRING, ChannelType.GR_INT: ChannelType.CTRL_INT, ChannelType.GR_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.GR_ENUM: ChannelType.CTRL_ENUM, ChannelType.GR_CHAR: ChannelType.CTRL_CHAR, ChannelType.GR_LONG: ChannelType.CTRL_LONG, ChannelType.GR_DOUBLE: ChannelType.CTRL_DOUBLE, # Control # ChannelType.TIME_STRING: ChannelType.TIME_STRING, ChannelType.CTRL_INT: ChannelType.CTRL_INT, ChannelType.CTRL_FLOAT: ChannelType.CTRL_FLOAT, ChannelType.CTRL_ENUM: ChannelType.CTRL_ENUM, ChannelType.CTRL_CHAR: ChannelType.CTRL_CHAR, ChannelType.CTRL_LONG: ChannelType.CTRL_LONG, ChannelType.CTRL_DOUBLE: ChannelType.CTRL_DOUBLE, # Special types ChannelType.PUT_ACKT: ChannelType.PUT_ACKT, ChannelType.PUT_ACKS: ChannelType.PUT_ACKS, ChannelType.STSACK_STRING: ChannelType.STSACK_STRING, ChannelType.CLASS_NAME: ChannelType.CLASS_NAME, }, } # All native types available native_types = {ChannelType.STRING, ChannelType.INT, ChannelType.FLOAT, ChannelType.ENUM, ChannelType.CHAR, ChannelType.LONG, ChannelType.DOUBLE} # Special types without any corresponding promoted versions special_types = {ChannelType.PUT_ACKT, ChannelType.PUT_ACKS, ChannelType.STSACK_STRING, ChannelType.CLASS_NAME} # ChannelTypes grouped by included metadata status_types = set(field_types['status'].values()) - set(special_types) time_types = set(field_types['time'].values()) - set(special_types) graphical_types = (set(field_types['graphic'].values()) - set(special_types) - {ChannelType.STS_STRING}) control_types = (set(field_types['control'].values()) - set(special_types) - {ChannelType.TIME_STRING}) # ChannelTypes grouped by value data type char_types = {ChannelType.CHAR, ChannelType.TIME_CHAR, ChannelType.CTRL_CHAR, ChannelType.STS_CHAR } string_types = {ChannelType.STRING, ChannelType.TIME_STRING, ChannelType.CTRL_STRING, ChannelType.STS_STRING } int_types = {ChannelType.INT, ChannelType.TIME_INT, ChannelType.CTRL_INT, ChannelType.LONG, ChannelType.TIME_LONG, ChannelType.CTRL_LONG, } float_types = {ChannelType.FLOAT, ChannelType.TIME_FLOAT, ChannelType.CTRL_FLOAT, ChannelType.DOUBLE, ChannelType.TIME_DOUBLE, ChannelType.CTRL_DOUBLE, } enum_types = {ChannelType.ENUM, ChannelType.STS_ENUM, ChannelType.TIME_ENUM, ChannelType.CTRL_ENUM } char_types = {ChannelType.CHAR, ChannelType.TIME_CHAR, ChannelType.CTRL_CHAR} native_float_types = {ChannelType.FLOAT, ChannelType.DOUBLE} native_int_types = {ChannelType.INT, ChannelType.CHAR, ChannelType.LONG, ChannelType.ENUM } # map of Epics DBR types to ctypes types DBR_TYPES = { cls.DBR_ID: cls for name, cls in globals().items() if (name.startswith('DBR_') and issubclass(cls, DbrTypeBase) and hasattr(cls, 'DBR_ID')) } # Unimplemented STRING types are mapped to DBR_TIME_STRING DBR_TYPES[ChannelType.GR_STRING] = DBR_STS_STRING DBR_TYPES[ChannelType.CTRL_STRING] = DBR_TIME_STRING def native_type(ftype): '''return native field type from TIME or CTRL variant''' return field_types['native'][ftype] def epics_timestamp_to_unix(seconds_since_epoch, nano_seconds): '''UNIX timestamp (seconds) from Epics TimeStamp structure''' return (EPICS2UNIX_EPOCH + seconds_since_epoch + 1.e-6 * int(1.e-3 * nano_seconds)) def timestamp_to_epics(ts): '''Python timestamp from EPICS TimeStamp structure''' if isinstance(ts, float): ts = datetime.datetime.utcfromtimestamp(ts) dt = ts - EPICS_EPOCH return int(dt.total_seconds()), int(dt.microseconds * 1e3)