Source code for caproto.ioc_examples.mini_beamline

#!/usr/bin/env python3
"""
This example is quite large. It defines a range of simulated detectors and
motors and is used for demos and tutorials.
"""
import contextvars
import functools
import math
import textwrap
import time

import numpy as np

from caproto.server import PVGroup, SubGroup, ioc_arg_parser, pvproperty, run

internal_process = contextvars.ContextVar("internal_process", default=False)


def no_reentry(func):
    @functools.wraps(func)
    async def inner(*args, **kwargs):
        if internal_process.get():
            return
        try:
            internal_process.set(True)
            return await func(*args, **kwargs)
        finally:
            internal_process.set(False)

    return inner


def _arrayify(func):
    @functools.wraps(func)
    def inner(*args):
        return func(*(np.asarray(a) for a in args))

    return inner


class _JitterDetector(PVGroup):
    """
    A jittery base class which assumes the subclass implements ``_read()``.

    The pvproperty ``det`` will be periodically updated based on the result of
    the subclass ``_read()``.
    """
    det = pvproperty(
        value=0,
        dtype=float,
        read_only=True,
        doc="Scalar detector value",
    )

    @det.scan(period=0.5)
    async def det(self, instance, async_lib):
        value = await self._read()
        await self.det.write(value=value)

    mtr = pvproperty(
        value=0,
        dtype=float,
        precision=3,
        record="ai",
        doc="Motor",
    )
    exp = pvproperty(
        value=1,
        dtype=float,
        doc="Exponential value",
    )
    vel = pvproperty(
        value=1,
        dtype=float,
        doc="Velocity",
    )

    mtr_tick_rate = pvproperty(
        value=10,
        dtype=float,
        units="Hz",
        doc="Update tick rate",
    )

    @exp.putter
    async def exp(self, instance, value):
        value = np.clip(value, a_min=0, a_max=None)
        return value

    @mtr.startup
    async def mtr(self, instance, async_lib):
        instance.ev = async_lib.library.Event()
        instance.async_lib = async_lib

    @mtr.putter
    @no_reentry
    async def mtr(self, instance, value):
        # "tick" at 10Hz
        dwell = 1 / self.mtr_tick_rate.value

        disp = value - instance.value
        # compute the total movement time based an velocity
        total_time = abs(disp / self.vel.value)
        # compute how many steps, should come up short as there will
        # be a final write of the return value outside of this call
        N = int(total_time // dwell)

        for j in range(N):
            # hide a possible divide by 0
            step_size = disp / N
            await instance.write(instance.value + step_size)
            await instance.async_lib.library.sleep(dwell)

        return value


[docs]class PinHole(_JitterDetector): """A pinhole simulation device.""" async def _read(self): sigma = 5 center = 0 c = -1 / (2 * sigma * sigma) @_arrayify def jitter_read(m, e, intensity): N = ( self.parent.N_per_I_per_s * intensity * e * np.exp(c * (m - center) ** 2) ) return np.random.poisson(N) return jitter_read(self.mtr.value, self.exp.value, self.parent.current.value)
[docs]class Edge(_JitterDetector): """An edge simulation device.""" async def _read(self): sigma = 2.5 center = 5 c = 1 / sigma @_arrayify def jitter_read(m, e, intensity): s = math.erfc(c * (-m + center)) / 2 N = self.parent.N_per_I_per_s * intensity * e * s return np.random.poisson(N) return jitter_read(self.mtr.value, self.exp.value, self.parent.current.value)
[docs]class Slit(_JitterDetector): """A slit simulation device.""" async def _read(self): sigma = 2.5 center = 7.5 c = 1 / sigma @_arrayify def jitter_read(m, e, intensity): s = (math.erfc(c * (m - center)) - math.erfc(c * (m + center))) / 2 N = self.parent.N_per_I_per_s * intensity * e * s return np.random.poisson(N) return jitter_read(self.mtr.value, self.exp.value, self.parent.current.value)
[docs]class MovingDot(PVGroup): N = 480 M = 640 sigmax = 50 sigmay = 25 background = 1000 Xcen = 0 Ycen = 0 det = pvproperty( value=[0] * N * M, dtype=float, read_only=True, doc=f"Detector image ({N}x{M})" ) @det.scan(period=2.0) async def det(self, instance, async_lib): back = np.random.poisson(self.background, (self.N, self.M)) if not self.shutter_open.value: await self.img_sum.write([back.sum()]) await instance.write(value=back.ravel()) return Y, X = np.ogrid[:self.N, :self.M] X = X - self.M / 2 + self.mtrx.value Y = Y - self.N / 2 + self.mtry.value X /= self.sigmax Y /= self.sigmay dot = ( np.exp(-(X ** 2 + Y ** 2) / 2) * np.exp(-(self.mtrx.value ** 2 + self.mtry.value ** 2) / 100 ** 2) ) I = self.parent.current.value # noqa e = self.exp.value measured = self.parent.N_per_I_per_s * dot * e * I ret = back + np.random.poisson(measured) await self.img_sum.write(value=ret.sum()) await instance.write(value=ret.ravel()) img_sum = pvproperty(value=0, read_only=True, dtype=float, doc="Image sum") mtrx = pvproperty(value=0, dtype=float, doc="Motor X") mtry = pvproperty(value=0, dtype=float, doc="Motor Y") exp = pvproperty(value=1, dtype=float) @exp.putter async def exp(self, instance, value): """Clip the value to be >= 0.""" return np.clip(value, a_min=0, a_max=None) shutter_open = pvproperty( value=1, dtype=int, doc="Shutter open/close", ) ArraySizeY_RBV = pvproperty( value=N, dtype=int, read_only=True, doc='Image array size Y' ) ArraySizeX_RBV = pvproperty( value=M, dtype=int, read_only=True, doc='Image array size X' ) ArraySize_RBV = pvproperty( value=[N, M], dtype=int, read_only=True, doc='Image array size [Y, X]', )
[docs]class MiniBeamline(PVGroup): """ A collection of detectors coupled to motors and an oscillating beam current. An IOC that provides a simulated pinhole, edge and slit with coupled with a shared global current that oscillates in time. """ N_per_I_per_s = 200 current = pvproperty(value=500, dtype=float, read_only=True) @current.scan(period=0.1) async def current(self, instance, async_lib): current = 500 + 25 * np.sin(time.monotonic() * (2 * np.pi) / 4) await instance.write(value=current) ph = SubGroup(PinHole, doc="Simulated pinhole") edge = SubGroup(Edge, doc="Simulated edge") slit = SubGroup(Slit, doc="Simulated slit") dot = SubGroup(MovingDot, doc="The simulated detector")
if __name__ == "__main__": ioc_options, run_options = ioc_arg_parser( default_prefix="mini:", desc=textwrap.dedent(MiniBeamline.__doc__), ) ioc = MiniBeamline(**ioc_options) run(ioc.pvdb, **run_options)