Source code for caproto.ioc_examples.mirror

#!/usr/bin/env python3
from caproto.asyncio.client import Context
from caproto.server import PVGroup, pvproperty, run, template_arg_parser


[docs]class Mirror(PVGroup): """ Subscribe to a PV and serve its value. The default prefix is ``mirror:``. PVs --- value """ # TODO This assumes the type of the value is float. value = pvproperty(value=0, dtype=float, read_only=True) def __init__(self, *args, target, **kwargs): self.target = target # PV to mirror self.client_context = None self.pv = None self.subscription = None super().__init__(*args, **kwargs) async def _callback(self, pv, response): # Update our own value based on the monitored one: await self.value.write( response.data, # We can even make the timestamp the same: timestamp=response.metadata.timestamp, ) @value.startup async def value(self, instance, async_lib): # Note that the asyncio context must be created here so that it knows # which asyncio loop to use: self.client_context = Context() self.pv, = await self.client_context.get_pvs(self.target) # Subscribe to the target PV and register self._callback. self.subscription = self.pv.subscribe(data_type='time') self.subscription.add_callback(self._callback)
if __name__ == '__main__': parser, split_args = template_arg_parser( default_prefix='mirror:', desc='Mirror the value of another floating-point PV.', supported_async_libs=('asyncio',) ) parser.add_argument('--target', help='The PV to mirror', required=True, type=str) args = parser.parse_args() ioc_options, run_options = split_args(args) ioc = Mirror(target=args.target, **ioc_options) run(ioc.pvdb, **run_options)