Input-Output Controllers (IOCs)

EPICS Input-Output Controllers (IOCs) expose an EPICS server. Behind the server, they may connect to a device driver, data-processing code, and/or an EPICS client for chaining with other servers.

In this section, we will review some of the example IOCs that are included with caproto, intended as demonstrations of what is possible.

Why Write an IOC Using Caproto?

Caproto makes it is easy to launch a protocol-compliant Channel Access server in just a couple lines of Python. This opens up some interesting possibilities:

  • In Python, it is easy to invoke standard web protocols. For example, writing an EPICS server around a device that speaks JSON may be easier with caproto than with any previously-existing tools.

  • Many scientists who rely on EPICS but may not understand the details of EPICS already know some Python for the data analysis work. Caproto may make it easier for scientists and controls engineers to collaborate.

  • As with its clients, caproto’s servers handle a human-friendly encapsulation of every message sent and received, which can be valuable for development, logging, and debugging.

Take a look around the ioc_examples subpackage for more examples not covered in this document.

Using the IOC Examples

They can be started like so:

$ python3 -m caproto.ioc_examples.simple
[I 16:51:09.751 server:93] Server starting up...
[I 16:51:09.752 server:109] Listening on 0.0.0.0:54966
[I 16:51:09.753 server:121] Server startup complete.

and stopped using Ctrl+C:

^C
[I 16:51:10.828 server:129] Server task cancelled. Must shut down.
[I 16:51:10.828 server:132] Server exiting....

Use --list-pvs to display which PVs they serve:

$ python3 -m caproto.ioc_examples.simple --list-pvs
[I 16:52:36.087 server:93] Server starting up...
[I 16:52:36.089 server:109] Listening on 0.0.0.0:62005
[I 16:52:36.089 server:121] Server startup complete.
[I 16:52:36.089 server:123] PVs available:
    simple:A
    simple:B
    simple:C

and use --prefix to conveniently customize the PV prefix:

$ python3 -m caproto.ioc_examples.simple --list-pvs --prefix my_custom_prefix:
[I 16:54:14.528 server:93] Server starting up...
[I 16:54:14.530 server:109] Listening on 0.0.0.0:55810
[I 16:54:14.530 server:121] Server startup complete.
[I 16:54:14.530 server:123] PVs available:
    my_custom_prefix:A
    my_custom_prefix:B
    my_custom_prefix:C

Type python3 -m caproto.ioc_examples.simple -h for more options.

PVGroup

The PVGroup is a container for pvproperty instances which determine how to represent the group’s contents as EPICS process variables (PVs).

You are expected to subclass from PVGroup to implement your own IOCs.

from caproto.server import PVGroup, pvproperty

class BasicIOC(PVGroup):
    my_pv = pvproperty(value=0, doc="My test PV")

caproto uses your class definition and performs magic under the hood to create a simple PV name to ChannelData instances. See what happens when you instantiate the above:

In [1]: ioc = BasicIOC(prefix='prefix:')

In [2]: dict(ioc.pvdb)
Out[2]: {'prefix:my_pv': <caproto.server.server.PvpropertyInteger at 0x7fe5c069f910>}

This simple dictionary database is what will be used to start your server. Adding on some boilerplate to help you parse command-line arguments and pick an async library backend to use:

from caproto.server import ioc_arg_parser, run
ioc_options, run_options = ioc_arg_parser(
    default_prefix='prefix:',
    desc="Basic IOC",
)
ioc = BasicIOC(**ioc_options)
run(ioc.pvdb, **run_options)

The above would allow you to access prefix:my_pv over Channel Access.

group_write

By default, if an explicit putter hook is not supplied for a given .pvproperty, an implicit one defined in the group will be used.

class MyGroup(PVGroup):
    async def group_write(self, instance, value):
        """Generic write called for channels without `put` defined."""
        print(f"{instance.pvspec.attr} was written to with {value}.")

This can be used to generically handle or modify the put handling of any pvproperty in the group.

Alarms

PVGroup by default has an alarms attribute which is defined as follows:

alarms = collections.defaultdict(ChannelAlarm)

Individual pvproperty instances specify their alarm instance as keys of this dictionary. That is,

my_property = pvproperty(value=0, alarm_group="primary")

means that my_property will join the “primary” alarm group, or the instance from

pvgroup_instance.alarms["primary"]

While this is not required to be a string, it is recommended to do so by convention.

Note that when alarm_group is unspecified, pvproperties in a given PVGroup instance share the same alarm instance - as their key to the above dictionary is left at the default value. This is usually a reasonable guess for small groups of PVs.

API

caproto.server.PVGroup(prefix, *[, macros, ...])

Class which groups a set of PVs for a high-level caproto server

pvproperty

pvproperty combines information as to how to represent your information over EPICS Channel Access and provides easy-to-use hooks for reacting to certain events – by hooking user-provided Python methods.

Basic configuration

Only one keyword argument is truly required: either value or dtype.

value or dtype

If you specify a value keyword argument, the pvproperty will assume its data type based on the supported data types table. That is, if you give a value of 1, dtype will be assumed to be int and the pvproperty will produce a PvpropertyInteger (ChannelInteger).

Alternatively, dtype may be specified directly as either a ChannelType such as dtype=ChannelType.LONG or equivalently the built-in Python type dtype=int. The default value will be chosen from the PVGroup’s default_values dictionary:

class MyPVGroup(PVGroup):
    default_values = {
        str: '',
        int: 0,
        float: 0.0,
        bool: False,

        ChannelType.STRING: '',
        ChannelType.INT: 0,
        ChannelType.LONG: 0,
        ChannelType.DOUBLE: 0.0,
        ChannelType.FLOAT: 0.0,
        ChannelType.ENUM: 0,
        ChannelType.CHAR: '',
    }

name

Properties in Python are a bit magical in their own right - property, when defined on a class, is aware of the name you have given it (by way of the descriptor protocol).

This means that pvproperty can give a reasonable default for its PV name. You are free to customize this, however, by specifying name="PV:SUFFIX" where PV:SUFFIX is your desired suffix to be appended to the parent PVGroup prefix.

doc

Please consider documenting what each individual property does by adding a human-understandable explanation string to the doc keyword argument.

This will be picked up by automatic documentation generation utilities and added to its parent group’s pvproperty table.

Also, if using a record, your doc string will automatically be sent over EPICS with the .DESC field.

my_property = pvproperty(value=1.0, record='ai', doc="My property")
$ caget prefix:my_property.DESC
prefix:my_property.DESC        My property

alarm_group

By default, pvproperties in a given PVGroup instance share the same alarm instance.

This is usually a reasonable guess for small groups of PVs. For finer-grained customization, the easiest way to change this functionality is to specify a per-pvproperty alarm group using the alarm_group keyword argument. It expects a string identifier, which should be reused on all other pvproperties that are to share the alarm instance.

max_length

This is the length of the array (when used with integers, floats) or the number of characters in a character or byte string.

By default, it will be the length of the value that is passed in - or 1 for scalar values.

EPICS clients expect to know the maximum length of an array at connection time and may not properly handle it changing dynamically. As such, pvproperty allows you to customize the size at property definition-time.

record

pvproperty allows you to pretend your value has common EPICS record fields. For example, it is possible to report a given property as an analog input (ai) record:

my_property = pvproperty(value=1.0, record='ai', doc="My property")

More on this in a later section.

Hooks

put, startup, shutdown, scan may be passed in directly with the pvproperty for reuse among multiple properties or to avoid the decorator syntax (the generally preferred syntax, shown in detail in later sections).

This can look like:

async def write_hook(self, instance, value):
    print(f"Wrote {value} to {instance.name}")

pv_a = pvproperty(put=single_write, value='A', doc="The first PV")
pv_b = pvproperty(put=single_write, value='A', doc="The second PV")

Other arguments

Other arguments will be passed to the ChannelData instance as-is. For example, these allow you to customize special classes such as ChannelEnum which may take in enum_strings - which would, of course, be unacceptable for a floating point value.

Supported data types

value may be an instance of one of the following, or dtype may be one of the following types:

Built-in Data Type Mapping

Data Type

Data Class

Inherits from

str

PvpropertyChar

ChannelChar

bytes

PvpropertyByte

ChannelByte

int

PvpropertyInteger

ChannelInteger

float

PvpropertyDouble

ChannelDouble

bool

PvpropertyBoolEnum

ChannelEnum

enum.IntEnum

PvpropertyEnum

ChannelEnum

dtype may be omitted when a value is specified.

If you desire finer-grained control over the data type provided (e.g., you specifically want a 32-bit float instead of a 64-bit double), you may use the appropriate ChannelType directly:

ChannelType Mapping

Data Type

Data Class

Inherits from

ChannelType.STRING

PvpropertyString

ChannelString

ChannelType.INT

PvpropertyShort

ChannelShort

ChannelType.LONG

PvpropertyInteger

ChannelInteger

ChannelType.DOUBLE

PvpropertyDouble

ChannelDouble

ChannelType.FLOAT

PvpropertyFloat

ChannelFloat

ChannelType.ENUM

PvpropertyEnum

ChannelEnum

ChannelType.CHAR

PvpropertyChar

ChannelChar

Integers and floats

Integer and floating point values by default map to the largest containers in EPICS - 32-bit integers and 64-bit floats, respectively.

Strings

Strings are a bit complicated when it comes to EPICS and caproto follows suit.

At minimum, the following will work to store up to 40 character strings:

my_string = pvproperty(
    # The default value:
    value="String value",
    # Document it!
    doc="Indicator as to what this does.",
)

A more complete specification looks like the following. See the “how to” section below for more details on why the following is the preferred method for strings.

my_string = pvproperty(
    # The default value:
    value="String value",
    # Document it!
    doc="Indicator as to what this does.",
    # Configure the PV suffix, if different than ``my_string``:
    name=":PV:SUFFIX",
    # Optionally configure the encoding:
    string_encoding='utf-8',
    # Ensure that this is marked as "report_as_string" (** optional)
    report_as_string=True,
    # Optionally specify how long the string can get - the default is
    # the length of the provided value.
    max_length=255,
)

Enums

A new syntax for specifying enums using built-in enum.IntEnum is recommended:

from caproto.server import PVGroup, pvproperty
import enum


class MyEnum(enum.IntEnum):
    off = 0
    on = 1
    unknown = 2


class MyPVGroup(PVGroup):
    my_enum = pvproperty(
        value=MyEnum.on,
        record='mbbi',
        doc="An enum with off/on/unknown, and mbbi record fields.",
    )

Alternatively, you may specify the strings manually:

from caproto import ChannelType
from caproto.server import PVGroup, pvproperty

class MyPVGroup(PVGroup):
    my_enum = pvproperty(
        value='on',
        enum_strings=("off", "on", "unknown"),
        dtype=ChannelType.ENUM,
        record='mbbi',
        doc="An enum with off/on/unknown, and mbbi record fields.",
    )

For the above mbbi records, the ZRST (zero string) field, ONST (one string) field, and so on (up to 15), are similarly respected and mapped from the enum strings (keyword argument or IntEnum class).

Booleans

Boolean values map to an enum type, with “Off” and “On” as the default strings.

When using either the bi or bo records, the ZNAM and ONAM fields are automatically populated with the string equivalent values for 0 and 1. These are derived from the enum_strings keyword argument.

Hooks

Hooks allow you to react to certain events that happen during the life of the IOC by providing your own methods.

Putter Hook

This is the most common hook you will need when writing a caproto-based IOC.

The “putter” associated with a pvproperty is called whenever a user writes to the associated PV through any client (caput, caproto-put, pyepics, caproto clients, and so on) and also when you write directly to it in code.

An example of usage and the required method signature is as follows:

my_property = pvproperty(value=0)

@my_property.putter
async def my_property(self, instance, value):
    """
    Startup hook for ``my_property``.

    Parameters
    ----------
    instance : ChannelData
        This is the instance of ``my_property``.

    value :
        This is the value the client wrote to the PV.

    Returns
    -------
    value :
        You may optionally change the value before committing it into
        the underlying ChannelData instance.

    Raises
    ------
    SkipWrite
        Raise this to skip further processing of the write.

    Exception
        When an unhandled error occurs.  Alarm will be set.
    """
    ...

Note that in the above, self will refer to the PVGroup instance. It will allow you to access the values of other PVs in the group, or any other state held within.

instance here is the ChannelData-based instance of my_property. It is exactly equivalent to self.my_property in the above example. This object holds the current value, alarm instance, and other associated metadata.

Since putter methods may be reused among any number of pvproperty instances, this parameter can be used generically in such scenarios.

The value can be assumed to be consistent with the data type of my_property. In the above case, this is int (as 0 is an integer).

You may optionally change the value before committing it into the underlying ChannelData instance. If this is needed, return the modified value here. Otherwise, the implicit value of None means to accept the value as-is.

Additionally, you may completely reject the write entirely by raising the .SkipWrite exception. This is a non-error exception which will leave the previously-stored value intact.

Startup Hook

This hook is executed when the IOC starts up.

An example of usage and the required method signature is as follows:

@my_property.startup
async def my_property(self, instance, async_lib):
    """
    Startup hook for ``my_property``.

    Parameters
    ----------
    instance : ChannelData
        This is the instance of ``my_property``.

    async_lib : AsyncLibraryLayer
        This is a shim layer for {asyncio, curio, trio} that you can use
        to make async library-agnostic IOCs.
    """
    ...

Note that in the above, self will refer to the PVGroup instance. It will allow you to access the values of other PVs in the group, or any other state held within.

Second, notice that my_property is repeated multiple times: the decorator must use my_property.startup to define the startup method, and the method itself must be named my_property. This is due to how decorators function and mirrors what you would see with a standard Python property.

Finally, the async_lib can be used to abstract away the async library layer that is in use. It is considered good practice to use, for example, async_lib.sleep instead of asyncio.sleep – or similarly trio.sleep, curio.sleep. This will allow your IOC to work no matter if you decide to use asyncio, curio, or trio as your async library.

Shutdown Hook

This hook is executed when the IOC shuts down.

An example of usage and the required method signature is as follows:

@my_property.shutdown
async def my_property(self, instance, async_lib):
    """
    Shutdown hook for ``my_property``.

    Parameters
    ----------
    instance : ChannelData
        This is the instance of ``my_property``.

    async_lib : AsyncLibraryLayer
        This is a shim layer for {asyncio, curio, trio} that you can use
        to make async library-agnostic IOCs.
    """
    ...

See the startup hook section above for more details on what these parameters mean.

Scan Hook

This hook is executed periodically, with the exact rate depending on its configuration.

@my_property.scan(period=0.1)
async def my_property(self, instance, async_lib):
    """
    Scan hook for ``my_property``.

    Parameters
    ----------
    instance : ChannelData
        This is the instance of ``my_property``.

    async_lib : AsyncLibraryLayer
        This is a shim layer for {asyncio, curio, trio} that you can use
        to make async library-agnostic IOCs.
    """
    ...

See the startup hook section above for more details on what these parameters mean.

.scan_wrapper arguments can be passed to my_property.scan(...) above to further customize how the routine is called. For example, it is possible to stop the scan routine when an exception happens (stop_on_error=True) or let users put to the .SCAN field (when using record="...") to change the rate to common EPICS-defined ones.

caproto.server.scan_wrapper(scan_function, ...)

Wrap a function intended for pvproperty.scan with common logic to periodically call it.

API

caproto.server.pvproperty([get, put, ...])

A property-like descriptor for specifying a PV in a PVGroup.

General Tips

Don’t use a getter

Generally speaking, you should not define a getter. This goes back to original design decisions for how pvproperty instances could work. The authors now know that this was a mistake to make easily accessible.

Why?

Inclusion of a getter can unfortunately make value monitoring less intuitive at best, or break its functionality at worst.

There is no way to monitor such a value for changes, as it requires calling the getter to determine the current value.

Each time a user runs caget YOUR:PV, the getter will be called. If the getter performs a write to update the instance, all other clients watching YOUR:PV will then see an update. The more clients added to the mix, the more frequent that other clients will see updates - seemingly at arbitrary times.

Reactive design

Most of what your IOC does should be in response to what the user requests by way of putter hooks.

Periodic updates should happen in scan loops.

If PVGroup data is sourced from a database, real device, website, etc, then a different approach may be more appropriate. See below in the “how do I” section.

Help!

My server is saying “High load”!

You may have a real issue in your code, or you may have clients which are saturating your server with requests.

Take a look at your code and see that you don’t have any synchronous (i.e., threaded) code being run in your async functions. This is really important. Defer it to an executor and you can avoid further headaches: see here.

If you just have a lot of clients hitting your server without any apparent negative effect, you can customize caproto’s server settings to not display this message.

This warning says my PV will take up a considerable amount of memory…

It’s probably true and it’s there to save you from the default settings.

You may either set max_subscription_backlog for each of your big PVs manually, or let caproto tweak it for you in an automated fashion.

See the environment variable section for details on what each variable means.

How do I…

… access async_lib directly in my class?

If you have any startup hooks defined, you can stash async_lib there.

thing = pvproperty(value=2, doc="An integer-valued PV.")

@thing.startup
async def thing(self, instance, async_lib):
    self.async_lib = async_lib

Or more generally, you can pass a startup_hook independent of any pvproperty by way of run():

class StartupAndShutdown(PVGroup):
    async def __ainit__(self, async_lib):
        self.log.warning("1. The IOC-level startup_hook from `run()` was called.")
        # Note that we have to pass this in to ``run()``!

if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='simple:',
        desc="Run an IOC that prints on startup and shutdown.")
    ioc = StartupAndShutdown(**ioc_options)
    run(ioc.pvdb, startup_hook=ioc.__ainit__, **run_options)

As an convention, consider naming your method __ainit__.

See the full IOC example in startup_and_shutdown_hooks.

… stop every PV in my group from sharing the same alarm?

By default, pvproperties in a given PVGroup instance share the same alarm instance. This is usually a reasonable guess for small groups of PVs. For finer-grained customization, the easiest way to change this functionality is to specify a per-pvproperty alarm group using the alarm_group keyword argument. It expects a string identifier, which should be reused on all other pvproperties that are expected to use the same alarm instance.

… make a string PV? All I see is an array of numbers!

If you have a short string that can fit into the EPICS definition of DBR_STRING - a maximum length of MAX_STRING_SIZE = 40 - you can use the following and not worry about anything further:

my_property = pvproperty(
    # The default value:
    value="String value",
    # Specify STRING as the data type - otherwise CHAR will be chosen:
    dtype=ChannelType.STRING,
    # Document what the property is for:
    doc="Indicator as to what this does.",
    # Optionally specify the encoding, if necessary:
    string_encoding='latin-1',
)

There is a convention you may not have heard about, what caproto refers to as long string support, in which DBR_STRING values can be longer than 40 characters when clients append a special character to the PV name:

$ caget -S my_property.$ my_property.VAL$
my_property.$ String value
my_property.VAL$ String value

$ caput -S my_property.VAL$ 1234567890123456789012345678901234567890123456789012345678901234567890
Old : my_property.VAL$ String value
New my_property.VAL$ 1234567890123456789012345678901234567890123456789012345678901234567890

Note that the last string is 70 characters long. What happens if we request it as a regular string without the long string modifier now?

$ caget my_property
my_property       1234567890123456789012345678901234567890

… it’s truncated to only 40 characters! There’s not much we can do about it, so clients should just be configured to use the custom modifier.

As an alternative, pvproperty supports arrays of DBR_CHAR which are signed, 8-bit numbers, meaning its values fall in the inclusive range of [-127, 127]. These can also be used to represent arbitrarily long strings.

my_property = pvproperty(
    # The default value:
    value="String value",
    # Document it!
    doc="Indicator as to what this does.",
    # Optionally configure the encoding:
    string_encoding='utf-8',
    # Ensure that this is marked as "report_as_string" (** optional)
    report_as_string=True,
    # Optionally specify how long the string can get - the default is
    # the length of the provided value.
    max_length=255,
)

Please note that you will still have to use the special long string modifier .$ to have this work with caget and caput.

report_as_string=True is optional here, but it makes it function more closely to the first example provided in this section. You can access up to the first 40 characters of the string without any special modifiers to caget and caput, or caproto-get and caproto-put.

… structure an IOC when talking to a real piece of hardware?

When talking to a single device and requesting a bunch of different information from it to update a single PVGroup to represent its status, we recommend the inclusion of a polling loop.

For a simple “query device and update state”-style group, this could look like:

update_hook = pvproperty(name="update", value=False, record='bi')

@update_hook.scan(period=0.1, use_scan_field=True)
async def update_hook(self, instance, async_lib):
    """
    Scan hook for ``update_hook``.

    Parameters
    ----------
    instance : ChannelData
        This is the instance of ``update_hook``.

    async_lib : AsyncLibraryLayer
        This is a shim layer for {asyncio, curio, trio} that you can use
        to make async library-agnostic IOCs.
    """
    # Reach out to the device asynchronously and get back information:
    device_info = await query_device()
    await self.value1.write(value=device_info["value1"])
    await self.value2.write(value=device_info["value2"])
    await self.value3.write(value=device_info["value3"])

With the above, the client has some control over how fast the updates happen. If that’s undesirable, set period=desired_rate and use_scan_field=False

If you don’t have an async-capable interface, there will be some additional work required. Consider using async_lib queues for this.

update_hook = pvproperty(name="update", value=False, record='bi')

@update_hook.startup
async def update_hook(self, instance, async_lib):
    """
    Startup hook for ``update_hook``.

    Parameters
    ----------
    instance : ChannelData
        This is the instance of ``update_hook``.

    async_lib : AsyncLibraryLayer
        This is a shim layer for {asyncio, curio, trio} that you can use
        to make async library-agnostic IOCs.
    """
    queue = async_lib.ThreadsafeQueue()
    thread = threading.Thread(target=my_threaded_function, kwargs=dict(queue=queue))
    thread.start()

    try:
        while True:  # or perhaps thread.is_alive()
            value = await queue.async_get()
            await self.prop.write(value=value["prop"])
    finally:
        ...  # perform cleanup here

If, instead, you have a bunch of knobs that the user can set with some control flow decisions to make, instead consider something like:

update_hook = pvproperty(name="update", value=False, record='bi')

@update_hook.startup
async def update_hook(self, instance, async_lib):
    """
    Startup hook for ``update_hook``.

    Parameters
    ----------
    instance : ChannelData
        This is the instance of ``update_hook``.

    async_lib : AsyncLibraryLayer
        This is a shim layer for {asyncio, curio, trio} that you can use
        to make async library-agnostic IOCs.
    """
    while True:
        device_info = await query_device()
        await self.value1.write(value=device_info["value1"])
        if self.user_requested.value == 'move':
            await self.queue_move()
        ...
        await async_lib.sleep(update_delay)

A more concrete example with aiohttp:

@update_hook.scan(period=5)
async def update_hook(self, instance, async_lib):
    try:
        try:
            await self._update_server_state()
            if self._should_download():
                await self._download_and_update()
        except (asyncio.TimeoutError,
                aiohttp.client_exceptions.ClientConnectorError):
            self._consecutive_timeouts += 1
            self.log.warning('Timeout while updating server (%d in a row)',
                             self._consecutive_timeouts)
            if self._consecutive_timeouts >= 6:
                self.log.error('Too many consecutive timeouts!')
                self._consecutive_timeouts = 0
                raise
        else:
            self._consecutive_timeouts = 0
    except Exception:
        self.log.exception('Update failed!')
        for key, alarm in self.alarms.items():
            if key is not None:
                await alarm.write(
                    status=AlarmStatus.COMM,
                    severity=AlarmSeverity.MAJOR_ALARM,
                )

… interact with threaded/non-async code, functions, and libraries?

Defer it to an executor: here.

… get rid of PVGroup and pvproperty? I hate them!

Those are some strong words!

caproto developers think the ease of customization and terseness of the class definition are positives - making it the easiest possible way to spin up an IOC - but we respect that it’s not a one-size-fits-all approach.

Let’s rephrase the above as “How do I use the alternative methods of building IOCs?”

The first option is to try working with PVSpec instances directly. Take a look at the no_pvproperty example.

The second option is going down to the lowest level, working with ChannelData instances directly. Take a look at the advanced type_varieties example.

… represent both the setpoint and readback with one pvproperty?

EPICS V3 records mostly do not have support for this, and caproto does not try to improve upon the status quo.

A common convention – areaDetector-style – where there are 2 PVs, one for the setpoint and one for the readback looks like the following:

SYSTEM:Value
SYSTEM:Value_RBV

Where SYSTEM:Value is the setpoint, and SYSTEM:Value_RBV is the “read-back value” that updates only after the server (or device) has acknowledged it.

caproto provides a simple helper tool to replicate this pattern efficiently. Using server.get_pv_pair_wrapper(), one can dynamically generate a SubGroup that contains both the setpoint and readback PVs. This wrapper is re-usable and may be used to generate any number of such pairs.

pvproperty_with_rbv = get_pv_pair_wrapper(setpoint_suffix='',
                                          readback_suffix='_RBV')

value = pvproperty_with_rbv(
    name="Value",
    value=0,
    doc="This is a new subgroup with Value and Value_RBV.",
)

@value.setpoint.putter
async def value(obj, instance, value):
    # accept the value immediately
    await obj.readback.write(value)
    # NOTE: you can access the full Group instance through obj.parent

You can further customize the setpoint and readback keyword arguments by using setpoint_kw and readback_kw. For example, you can set different record types by specifying setpoint_kw=dict(record="ao").

caproto.server.get_pv_pair_wrapper([...])

Generates a Subgroup class for a pair of PVs (setpoint and readback).

Note

The API of this changed in v0.7.0, making it easier to pass in class kwargs.

… do some really crazy things with caproto?

Take a look at the pathological IOC examples. Take extra care not to run them in production!

… handle records and what are the limitations there?

For any given pvproperty, you can specify a caproto-supported record type by way of the record='' keyword argument.

Please note that none of the classes listed here implement the full functionality of the corresponding record, but make available over Channel Access all of the fields one would normally expect from that record.

That said, some functionality is provided out-of-the-box, and you can further customize this functionality in your own implementations.

See the Records section for further details.

Further limitations/notes:

  • In/out links do not work

  • Many fields are not (yet) implemented

… add a client to monitor other PVs?

As of the time of writing, the threading client is more well-tested than the asyncio client, and an example thread_client_monitor was written around it.

The above is admittedly rather complicated, but gives you a good amount of control. It also demonstrates thread-to-async communication by way of async library-independent queues.

A more user-friendly (but less tested) example is ioc_examples.client_monitor_async, which is currently limited to asyncio-only:

#!/usr/bin/env python3
from textwrap import dedent

import caproto as ca
from caproto.asyncio.client import Context
from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run


class MirrorClientIOC(PVGroup):
    """
    An IOC which mirrors the value, timestamp, and alarm status of a given PV
    into the `mirrored` pvproperty.

    With the default configuration, this IOC assumes that the PV "simple:A"
    exists on some external IOC.

    The "simple" IOC may be started before or after this IOC.  If the server
    goes down, the client will automatically reconnect when available.

    Scalar PVs
    ----------
    mirrored (float, analog input)
    """

    mirrored = pvproperty(value=0.0, record='ai')

    def __init__(self, pv_to_mirror, *args, **kwargs):
        self.pv_to_mirror = pv_to_mirror
        super().__init__(*args, **kwargs)

    async def __ainit__(self, async_lib):
        print('* `__ainit__` startup hook called')

        # Create an asyncio client context:
        ctx = Context()

        # Loop and grab items from the queue one at a time
        async for event, context, data in ctx.monitor(self.pv_to_mirror):
            if event == 'subscription':
                print('* Client pushed a new value in the queue')
                print(f'\tValue={data.data} {data.metadata}')

                # Mirror the value, status, severity, and timestamp:
                await self.mirrored.write(data.data,
                                          timestamp=data.metadata.timestamp,
                                          status=data.metadata.status,
                                          severity=data.metadata.severity)
            elif event == 'connection':
                print(f'* Client connection state changed: {data}')
                if data == 'disconnected':
                    # Raise an alarm - our client PV is disconnected.
                    await self.mirrored.write(
                        self.mirrored.value,
                        status=ca.AlarmStatus.LINK,
                        severity=ca.AlarmSeverity.MAJOR_ALARM
                    )


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='mirror:',
        desc=dedent(MirrorClientIOC.__doc__),
        supported_async_libs=('asyncio', ),
    )

    ioc = MirrorClientIOC('simple:A', **ioc_options)
    run(ioc.pvdb, startup_hook=ioc.__ainit__, **run_options)

caproto.ioc_examples.client_monitor_async.MirrorClientIOC(...)

An IOC which mirrors the value, timestamp, and alarm status of a given PV into the mirrored pvproperty.

To work directly with the asyncio client context, you might consider basing your IOC on the following example instead:

#!/usr/bin/env python3
from caproto.asyncio.client import Context
from caproto.server import PVGroup, pvproperty, run, template_arg_parser


class Mirror(PVGroup):
    """
    Subscribe to a PV and serve its value.

    The default prefix is ``mirror:``.

    PVs
    ---
    value
    """
    # TODO This assumes the type of the value is float.
    value = pvproperty(value=0, dtype=float, read_only=True)

    def __init__(self, *args, target, **kwargs):
        self.target = target  # PV to mirror
        self.client_context = None
        self.pv = None
        self.subscription = None
        super().__init__(*args, **kwargs)

    async def _callback(self, pv, response):
        # Update our own value based on the monitored one:
        await self.value.write(
            response.data,
            # We can even make the timestamp the same:
            timestamp=response.metadata.timestamp,
        )

    @value.startup
    async def value(self, instance, async_lib):
        # Note that the asyncio context must be created here so that it knows
        # which asyncio loop to use:
        self.client_context = Context()

        self.pv, = await self.client_context.get_pvs(self.target)

        # Subscribe to the target PV and register self._callback.
        self.subscription = self.pv.subscribe(data_type='time')
        self.subscription.add_callback(self._callback)


if __name__ == '__main__':
    parser, split_args = template_arg_parser(
        default_prefix='mirror:',
        desc='Mirror the value of another floating-point PV.',
        supported_async_libs=('asyncio',)
    )
    parser.add_argument('--target',
                        help='The PV to mirror', required=True, type=str)
    args = parser.parse_args()
    ioc_options, run_options = split_args(args)
    ioc = Mirror(target=args.target, **ioc_options)
    run(ioc.pvdb, **run_options)

caproto.ioc_examples.mirror.Mirror(*args, ...)

Subscribe to a PV and serve its value.

… get custom arguments into my IOC?

At the moment, the easiest way to get a custom argument into an IOC is by specifying a macro with a default value to caproto.server.ioc_arg_parser().

These macros would be accessible in your PVGroup __init__ as the macros keyword argument.

Alternatively, you can add arguments directly to the parser, as seen in the chirp example:

parser, split_args = template_arg_parser(
    default_prefix="prefix:",
    desc="IOC description.",
)

parser.add_argument(
    "--flag",
    help="Set an arbitrary flag.",
    action='store_true',
)

args = parser.parse_args()
ioc_options, run_options = split_args(args)
ioc = MyPVGroup(flag=args.flag, **ioc_options)
run(ioc.pvdb, **run_options)

If your needs are more advanced, feel free to reimplement the above functions. So long as you can instantiate your PVGroup and pass its pvdb to run(), caproto will not complain.

… use macros? And what are macros?

Macros allow you to specify portions of PV names at IOC initialization-time instead of at PVGroup/pvproperty definition time.

Macros can be used in addition to - or in place of - the default PV prefix.

See the MacroifiedNames for an example.

… make a bunch of caproto IOCs without all the boilerplate?

Take a look at the cookiecutters. You can generate a full Python package IOC template along with a startup script template pretty quickly with this tool.

… get a structured view of my PVGroup on the client side?

(This is a shameless plug for our other free tools…)

Consider trying ophyd. devices as the client-side interface to your server-side caproto IOC.

If you buy into this ecosystem, you will be able to:

  • Use your caproto-backed PVs in scans and other data acquisition routines by way of bluesky

  • Auto-generate EPICS user interfaces with Typhos and PyDM

  • Track and organize your devices by way of happi

Examples

Below, we will use caproto’s threading client to interact with caproto IOC.

In [1]: from caproto.threading.client import Context

In [2]: ctx = Context()  # a client Context used to explore the servers below

Of course, standard epics-base clients or other caproto clients may also be used.

Simple IOC

This IOC has two PVs that simply store a value.

#!/usr/bin/env python3
from textwrap import dedent

from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run


class SimpleIOC(PVGroup):
    """
    An IOC with three uncoupled read/writable PVs.

    Scalar PVs
    ----------
    A (int)
    B (float)

    Array PVs
    ---------
    C (array of int)
    """
    A = pvproperty(
        value=1,
        doc='An integer',
    )
    B = pvproperty(
        value=2.0,
        doc='A float'
    )
    C = pvproperty(
        value=[1, 2, 3],
        doc='An array of integers (max length 3)'
    )


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='simple:',
        desc=dedent(SimpleIOC.__doc__))
    ioc = SimpleIOC(**ioc_options)
    run(ioc.pvdb, **run_options)

caproto.ioc_examples.simple.SimpleIOC(prefix, *)

An IOC with three uncoupled read/writable PVs.

$ python3 -m caproto.ioc_examples.simple --list-pvs
[I 18:08:47.628 server:93] Server starting up...
[I 18:08:47.630 server:109] Listening on 0.0.0.0:56840
[I 18:08:47.630 server:121] Server startup complete.
[I 18:08:47.630 server:123] PVs available:
    simple:A
    simple:B
    simple:C

Using the threading client context we created above, we can read these values and write to them.

In [3]: a, b, c = ctx.get_pvs('simple:A', 'simple:B', 'simple:C')

In [4]: a.read()
Out[4]: ReadNotifyResponse(data=array([1], dtype=int32), data_type=<ChannelType.LONG: 5>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)

In [5]: b.read()
Out[5]: ReadNotifyResponse(data=array([2.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=1, metadata=None)

In [6]: c.read()
Out[6]: ReadNotifyResponse(data=array([1, 2, 3], dtype=int32), data_type=<ChannelType.LONG: 5>, data_count=3, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=2, metadata=None)

In [7]: b.write(5)
Out[7]: WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=3)

In [8]: b.read()
Out[8]: ReadNotifyResponse(data=array([5.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=4, metadata=None)

In [9]: c.write([4, 5, 6])
Out[9]: WriteNotifyResponse(data_type=<ChannelType.LONG: 5>, data_count=3, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=5)

In [10]: c.read()
Out[10]: ReadNotifyResponse(data=array([4, 5, 6], dtype=int32), data_type=<ChannelType.LONG: 5>, data_count=3, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=6, metadata=None)

Write to a File When a PV is Written To

#!/usr/bin/env python3
import pathlib
import tempfile
import textwrap

from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run


class CustomWrite(PVGroup):
    """
    When a PV is written to, write the new value into a file as a string.
    """

    async def my_write(self, instance, value):
        # Compose the filename based on whichever PV this is.
        # For this IOC, the following will be: 'A' or 'B'
        filename = pathlib.Path(self.directory.value) / instance.name[-1]
        with open(filename, 'wt') as f:
            print(f"{value}", file=f)

        self.log.warning(f'Wrote {value} to {filename}')
        return value

    directory = pvproperty(
        value=tempfile.gettempdir(),
        doc="The directory to write data to",
        string_encoding='utf-8',
        report_as_string=True,
        max_length=255,
    )

    A = pvproperty(put=my_write, value=0, doc="Writes to (DIRECTORY)/A")
    B = pvproperty(put=my_write, value=0, doc="Writes to (DIRECTORY)/B")


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='custom_write:',
        desc=textwrap.dedent(CustomWrite.__doc__)
    )
    ioc = CustomWrite(**ioc_options)
    run(ioc.pvdb, **run_options)
$ python3 -m caproto.ioc_examples.custom_write --list-pvs
[I 18:12:07.282 server:93] Server starting up...
[I 18:12:07.284 server:109] Listening on 0.0.0.0:57539
[I 18:12:07.284 server:121] Server startup complete.
[I 18:12:07.284 server:123] PVs available:
    custom_write:A
    custom_write:B

On the machine where the server resides, we will see files update whenever any client writes.

In [11]: import pathlib

In [12]: import tempfile

# /tmp on Linux, %TEMP% on Windows, /var/folders on macOS, ...
In [13]: ioc_write_path = pathlib.Path(tempfile.gettempdir()) / 'A'

In [14]: a, b = ctx.get_pvs('custom_write:A', 'custom_write:B')

In [15]: a.write(5)
Out[15]: WriteNotifyResponse(data_type=<ChannelType.LONG: 5>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0)

In [16]: print(open(ioc_write_path, 'rt').read())
5


In [17]: a.write(10)
Out[17]: WriteNotifyResponse(data_type=<ChannelType.LONG: 5>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=1)

In [18]: print(open(ioc_write_path, 'rt').read())
10

It is easy to imagine extending this example to write a socket or a serial device rather than a file.

Random Walk

This example contains a PV random_walk:x that takes random steps at an update rate controlled by a second PV, random_walk:dt.

caproto.ioc_examples.random_walk.RandomWalkIOC(...)

This example contains a PV x that takes random steps at an update rate controlled by a second PV, dt.

#!/usr/bin/env python3
import random
from textwrap import dedent

from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run


class RandomWalkIOC(PVGroup):
    """
    This example contains a PV ``x`` that takes random steps at an update rate
    controlled by a second PV, ``dt``.
    """
    dt = pvproperty(value=3.0, doc="Delta time [sec]")
    x = pvproperty(value=0.0, doc="The random float value")

    @x.startup
    async def x(self, instance, async_lib):
        """This is a startup hook which periodically updates the value."""
        while True:
            # Grab the current value from `self.x` and compute the next value:
            x = self.x.value + 2. * random.random() - 1.0

            # Update the ChannelData instance and notify any subscribers:
            await instance.write(value=x)

            # Let the async library wait for the next iteration
            await async_lib.sleep(self.dt.value)


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='random_walk:',
        desc=dedent(RandomWalkIOC.__doc__))
    ioc = RandomWalkIOC(**ioc_options)
    run(ioc.pvdb, **run_options)

Note

What is async_lib.library.sleep?

As caproto supports three different async libraries, we have an “async layer” class that gives compatible async versions of commonly-used synchronization primitives. The attribute async_lib.library would be either the Python module asyncio (default), trio, or curio, depending on how the server is run. It happens that all three of these modules have a sleep function at the top level, so async_lib.library.sleep accesses the appropriate sleep function for each library.

Why not use time.sleep?

The gist is that asyncio.sleep doesn’t hold up your entire thread / event loop, but gives back control to the event loop to run other tasks while sleeping. The function time.sleep, on the other hand, would cause noticeable delays and problems.

This is a fundamental consideration in concurrent programming generally, not specific to caproto. See for example this StackOverflow post for more information.

I/O Interrupt

This example listens for key presses.

caproto.ioc_examples.io_interrupt.start_io_interrupt_monitor(...)

This function monitors the terminal it was run in for keystrokes.

caproto.ioc_examples.io_interrupt.IOInterruptIOC(...)

An IOC that updates on keypress events.

#!/usr/bin/env python3
import atexit
import fcntl
import os
import sys
import termios
import textwrap
import threading

from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run


def start_io_interrupt_monitor(new_value_callback):
    '''
    This function monitors the terminal it was run in for keystrokes.
    On each keystroke, it calls new_value_callback with the given keystroke.

    This is used to simulate the concept of an I/O Interrupt-style signal from
    the EPICS world. Those signals depend on hardware to tell EPICS when new
    values are available to be read by way of interrupts - whereas we use
    callbacks here.
    '''

    # Thanks stackoverflow and Python 2 FAQ!
    if not sys.__stdin__.isatty():
        print('[IO Interrupt] stdin is not a TTY, exiting')
        return

    fd = sys.stdin.fileno()

    oldterm = termios.tcgetattr(fd)
    newattr = termios.tcgetattr(fd)
    newattr[3] &= ~termios.ICANON & ~termios.ECHO
    oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)

    print('Started monitoring the keyboard outside of the async library')

    # When the process exits, be sure to reset the terminal settings
    @atexit.register
    def reset_terminal():
        print('Resetting the terminal settings')
        termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
        fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
        print('Done')

    termios.tcsetattr(fd, termios.TCSANOW, newattr)
    fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)

    # Loop forever, sending back keypresses to the callback
    while True:
        try:
            char = sys.stdin.read(1)
        except IOError:
            ...
        else:
            if char:
                print(f'New keypress: {char!r}')
                new_value_callback(char)


class IOInterruptIOC(PVGroup):
    """
    An IOC that updates on keypress events.

    PVs
    ---
    keypress (str)
        The latest key pressed in the IOC terminal.
    """
    keypress = pvproperty(
        value='',
        max_length=10,
        doc="Latest keypress",
        read_only=True,
    )

    # NOTE the decorator used here:
    @keypress.startup
    async def keypress(self, instance, async_lib):
        # This method will be called when the server starts up.
        print('* keypress method called at server startup')
        queue = async_lib.ThreadsafeQueue()

        # Start a separate thread that monitors keyboard input, telling it to
        # put new values into our async-friendly queue
        thread = threading.Thread(
            target=start_io_interrupt_monitor,
            daemon=True,
            kwargs=dict(new_value_callback=queue.put)
        )
        thread.start()

        # Loop and grab items from the queue one at a time
        while True:
            value = await queue.async_get()
            print(f'Saw new value on async side: {value!r}')

            # Propagate the keypress to the EPICS PV, triggering any monitors
            # along the way
            await self.keypress.write(str(value))


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='io:',
        desc=textwrap.dedent(IOInterruptIOC.__doc__),
    )

    ioc = IOInterruptIOC(**ioc_options)
    run(ioc.pvdb, **run_options)
$ python -m caproto.ioc_examples.io_interrupt --list-pvs
[I 10:18:57.643 server:132] Server starting up...
[I 10:18:57.644 server:145] Listening on 0.0.0.0:54583
[I 10:18:57.646 server:218] Server startup complete.
[I 10:18:57.646 server:220] PVs available:
    io:keypress
* keypress method called at server startup
Started monitoring the keyboard outside of the async library

Typing causes updates to be sent to any client subscribed to io:keypress. If we monitoring using the commandline-client like so:

$ caproto-monitor io:keypress

and go back to the server and type some keys:

New keypress: 'a'
Saw new value on async side: 'a'
New keypress: 'b'
Saw new value on async side: 'b'
New keypress: 'c'
Saw new value on async side: 'c'
New keypress: 'd'
Saw new value on async side: 'd'

the client will receive the updates:

io:keypress                               2018-06-14 10:19:04 [b'd']
io:keypress                               2018-06-14 10:20:26 [b'a']
io:keypress                               2018-06-14 10:20:26 [b's']
io:keypress                               2018-06-14 10:20:26 [b'd']

Macros for PV names

caproto.ioc_examples.macros.MacroifiedNames(...)

An IOC with PVs that have macro-ified names.

#!/usr/bin/env python3
from textwrap import dedent

from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run


class MacroifiedNames(PVGroup):
    """
    An IOC with PVs that have macro-ified names.

    Required macros: "beamline" and "suffix"

    PVs
    ---
    {beamline}:{suffix}.VAL
    {beamline}:{suffix}.RBV
    """
    placeholder1 = pvproperty(value=0, name='{beamline}:{suffix}.VAL')
    placeholder2 = pvproperty(value=0, name='{beamline}:{suffix}.RBV')


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='macros:',
        desc=dedent(MacroifiedNames.__doc__),
        # Provide default values for macros.
        # Passing None as a default value makes a parameter required.
        macros=dict(beamline='my_beamline', suffix='thing'))
    ioc = MacroifiedNames(**ioc_options)
    run(ioc.pvdb, **run_options)

The help string for this IOC contains two extra entries at the bottom:

$ python3 -m caproto.ioc_examples.macros -h
usage: macros.py [-h] [--prefix PREFIX] [-q | -v] [--list-pvs]
                [--async-lib {asyncio,curio,trio}]
                [--interfaces INTERFACES [INTERFACES ...]]
                [--beamline BEAMLINE] [--suffix SUFFIX]

Run an IOC with PVs that have macro-ified names.

optional arguments:

<...snipped...>

--beamline BEAMLINE   Macro substitution, optional
--suffix SUFFIX       Macro substitution, optional
$ python3 -m caproto.ioc_examples.macros --beamline XF31ID --suffix detector --list-pvs
[I 18:44:39.528 server:93] Server starting up...
[I 18:44:39.530 server:109] Listening on 0.0.0.0:56365
[I 18:44:39.531 server:121] Server startup complete.
[I 18:44:39.531 server:123] PVs available:
    macros:XF31ID:detector.VAL
    macros:XF31ID:detector.RBV
In [19]: pv, = ctx.get_pvs('macros:XF31ID:detector.VAL')

In [20]: pv.read()
Out[20]: ReadNotifyResponse(data=array([0], dtype=int32), data_type=<ChannelType.LONG: 5>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)

Observe that the command line arguments fill in the PV names.

Subgroups

The PVGroup is designed to be nested, which provides a nice path toward future capability: IOCs that are natively “V7”, encoding semantic structure for PV Access clients and decomposing into flat PVs for Channel Access clients.

#!/usr/bin/env python3
import random

from caproto.server import PVGroup, SubGroup, ioc_arg_parser, pvproperty, run


class MySubGroup(PVGroup):
    """Example group of PVs, where the prefix is defined on instantiation."""

    def __init__(self, *args, max_value, **kwargs):
        super().__init__(*args, **kwargs)
        self.max_value = max_value

    rand = pvproperty(
        value=1,
        name="random",
        doc="A random value between 1 and self.max_value.",
    )

    @rand.scan(period=2.0, stop_on_error=False, use_scan_field=False)
    async def rand(self, instance, async_lib):
        print(
            f'{instance.pvname} a random value in [1, {self.max_value}]'
        )
        await instance.write(value=random.randint(1, self.max_value))


class MyPVGroup(PVGroup):
    'Example group of PVs, a mix of pvproperties and subgroups'

    # Create two subgroups:
    group1 = SubGroup(MySubGroup, prefix="group1:", max_value=5)
    group2 = SubGroup(MySubGroup, prefix='group2:', max_value=20)

    # And a third one, using the decorator pattern:
    @SubGroup(prefix='group3:')
    class group3(PVGroup):
        nested_rand = pvproperty(
            value=1,
            name="random",
            doc="A random value between 1 and self.max_value.",
        )

        @nested_rand.scan(period=2.0, stop_on_error=False, use_scan_field=False)
        async def nested_rand(self, instance, async_lib):
            max_value = 10
            print(
                f'{instance.pvname} random value from 1 to {max_value}'
            )
            await instance.write(value=random.randint(1, max_value))


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='subgroups:',
        desc='Run an IOC with groups of groups of PVs.')
    ioc = MyPVGroup(**ioc_options)
    run(ioc.pvdb, **run_options)
$ python -m caproto.ioc_examples.subgroups --list-pvs
random using the descriptor getter is: <caproto.server.server.PvpropertyInteger object at 0x10928ffd0>
subgroup4 is: <__main__.MyPVGroup.group4.subgroup4 object at 0x10928fe10>
subgroup4.random is: <caproto.server.server.PvpropertyInteger object at 0x10928fef0>
[I 11:08:35.074 server:132] Server starting up...
[I 11:08:35.074 server:145] Listening on 0.0.0.0:63336
[I 11:08:35.076 server:218] Server startup complete.
[I 11:08:35.077 server:220] PVs available:
    subgroups:random
    subgroups:RECORD_LIKE1.RTYP
    subgroups:RECORD_LIKE1.VAL
    subgroups:RECORD_LIKE1.DESC
    subgroups:recordlike2.RTYP
    subgroups:recordlike2.VAL
    subgroups:recordlike2.DESC
    subgroups:group1:random
    subgroups:group2-random
    subgroups:group3_prefix:random
    subgroups:group4:subgroup4:random

Records

See Records.

caproto.ioc_examples.records.RecordMockingIOC(...)

#!/usr/bin/env python3
from caproto.server import (PVGroup, PvpropertyDouble, PvpropertyInteger,
                            PvpropertyString, ioc_arg_parser, pvproperty, run)
from caproto.server.records import AiFields, AoFields


class RecordMockingIOC(PVGroup):
    # Define three records, an analog input (ai) record:
    A = pvproperty(value=1.0, dtype=PvpropertyDouble[AiFields], record=AiFields)
    # And an analog output (ao) record:
    B = pvproperty(value=2.0, dtype=PvpropertyDouble, record=AoFields, precision=3)
    # and an ai with all the bells and whistles
    C = pvproperty(
        value=0.0,
        record="ai",
        upper_alarm_limit=2.0,
        lower_alarm_limit=-2.0,
        upper_warning_limit=1.0,
        lower_warning_limit=-1.0,
        upper_ctrl_limit=3.0,
        lower_ctrl_limit=-3.0,
        units="mm",
        precision=3,
        doc="The C pvproperty",
    )
    # In the above, you'll note that we still support referencing records by
    # their (string) name.  we recommend that you use the field classes (e.g.,
    # AiFields) to be more explicit and allow your static analyzer to help
    # check your code.

    @B.putter
    async def _b_val_put(self, instance: PvpropertyDouble, value: float):
        if value == 1:
            # Mocked record will pick up the alarm status simply by us raising
            # an exception in the putter:
            raise ValueError('Invalid value!')

    # It's also possible to modify some of the behavior of fields on a per-
    # record basis. The following function is called whenever A.RVAL is put to:
    @A.fields.current_raw_value.putter
    async def _a_raw_put(fields: AiFields, instance: PvpropertyInteger, value: int):
        # However, somewhat confusingly, 'self' in this case is the fields
        # associated with 'A'. To access the IOC (i.e., the main PV group)
        # use the following:
        ioc = fields.parent.group
        print(f'A.RVAL: Writing values to A and B: {value}')
        await ioc.B.write(value)
        await ioc.A.write(value)

    @B.fields.current_raw_value.putter
    async def _b_raw_put(fields, instance: PvpropertyInteger, value: int):
        ioc = fields.parent.group
        print('B.RVAL: Writing modified values to A, B')
        await ioc.B.write(value + 10)
        await ioc.A.write(value - 10)

    # Now that the field specification has been set on B, it can be reused:
    D = pvproperty(
        value=2.0, precision=3, field_spec=B.field_spec, dtype=PvpropertyDouble
    )
    # D will also be reported as an 'ao' record like B, as it uses the same
    # field specification.

    E = pvproperty(
        value="this is a test",
        record="stringin",
        dtype=PvpropertyString,
    )


if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='mock:',
        desc='Run an IOC that mocks an ai (analog input) record.')

    # Instantiate the IOC, assigning a prefix for the PV names.
    ioc = RecordMockingIOC(**ioc_options)
    print('PVs:', list(ioc.pvdb))

    # ... but what you don't see are all of the analog input record fields
    print('Fields of B:', list(ioc.B.fields.keys()))

    print('Custom field specifications of A:', RecordMockingIOC.A.fields)
    print('Custom field specifications of B:', RecordMockingIOC.B.fields)
    print('Custom field specifications of C:', RecordMockingIOC.C.fields)
    print('Custom field specifications of D:', RecordMockingIOC.D.fields)
    print('Custom field specifications of E:', RecordMockingIOC.E.fields)

    # Run IOC.
    run(ioc.pvdb, **run_options)
python -m caproto.ioc_examples.records --list-pvs
PVs: ['mock:A', 'mock:B']
Fields of B: ['ACKS', 'ACKT', 'ASG', 'DESC', 'DISA', 'DISP', 'DISS', 'DISV', 'DTYP', 'EVNT', 'FLNK', 'LCNT', 'NAME', 'NSEV', 'NSTA', 'PACT', 'PHAS', 'PINI', 'PRIO', 'PROC', 'PUTF', 'RPRO', 'SCAN', 'SDIS', 'SEVR', 'TPRO', 'TSE', 'TSEL', 'UDF', 'RTYP', 'STAT', 'RVAL', 'INIT', 'MLST', 'LALM', 'ALST', 'LBRK', 'ORAW', 'ROFF', 'SIMM', 'SVAL', 'HYST', 'HIGH', 'HSV', 'HIHI', 'HHSV', 'LOLO', 'LLSV', 'LOW', 'LSV', 'AOFF', 'ASLO', 'EGUF', 'EGUL', 'LINR', 'EOFF', 'ESLO', 'SMOO', 'ADEL', 'PREC', 'EGU', 'HOPR', 'LOPR', 'MDEL', 'INP', 'SIOL', 'SIML', 'SIMS']
[I 11:07:48.635 server:132] Server starting up...
[I 11:07:48.636 server:145] Listening on 0.0.0.0:49637
[I 11:07:48.638 server:218] Server startup complete.
[I 11:07:48.638 server:220] PVs available:
    mock:A
    mock:B

Mini-Beamline

Simulate your own mini beamline with this IOC.

caproto.ioc_examples.mini_beamline

This example is quite large.

caproto.ioc_examples.mini_beamline.MiniBeamline(...)

A collection of detectors coupled to motors and an oscillating beam current.

caproto.ioc_examples.mini_beamline.PinHole(...)

A pinhole simulation device.

caproto.ioc_examples.mini_beamline.Edge(...)

An edge simulation device.

caproto.ioc_examples.mini_beamline.Slit(...)

A slit simulation device.

caproto.ioc_examples.mini_beamline.MovingDot(...)

More…

Take a look around the ioc_examples subpackage for more examples not covered here.

caproto.ioc_examples.enums.EnumIOC(prefix, *)

An IOC with some enums.

caproto.ioc_examples.autosave.AutosavedSimpleIOC(...)

An IOC with three uncoupled read/writable PVs

caproto.ioc_examples.decay.Decay(*args, **kwargs)

Simulates an exponential decay to a set point.

caproto.ioc_examples.scan_rate.ScanRateIOC(...)

Helpers

caproto offers several “helper” subgroups (SubGroup) that are of general use, and could be considered part of the “caproto server standard library”, so to speak.

Autosave

AutosaveHelper(*args[, file_manager])

RotatingFileManager(filename, *[, ...])

Save to a primary filename, while rotating out old copies.

Status / Statistics

StatusHelper(prefix, *[, macros, parent, name])

An IocStats-like tool for caproto IOCs.

BasicStatusHelper(*args, **kwargs)

PeriodicStatusHelper(*args, **kwargs)

An IocStats-like tool for caproto IOCs.

MemoryTracingHelper(*args[, filters])

A helper which quickly allows for tracing memory usage and allocations on a caproto server instance.