Synchronous Client

The synchronous client optimizes for simplicity of implementation over performance. This has its uses, as described below. But for high-performance applications, use one of caproto’s other clients such as the Threading Client.

Tutorial

In a separate shell, start one of caproto’s demo IOCs.

$ python3 -m caproto.ioc_examples.random_walk
PVs: ['random_walk:dt', 'random_walk:x']

Now, in Python we will talk to it using caproto’s synchronous client.

Read

In [1]: from caproto.sync.client import read

In [2]: res = read('random_walk:dt')

In [3]: res
Out[3]: ReadNotifyResponse(data=array([3.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)

This object is a human-friendly representation of the server’s response. The raw bytes of that response are:

In [4]: bytes(res)
Out[4]: b'\x00\x0f\x00\x08\x00\x06\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00@\x08\x00\x00\x00\x00\x00\x00'

Access particular fields in the response using attribute (“dot”) access on res.

In [5]: res.data
Out[5]: array([3.])

By default, the client does not request any metadata

In [6]: res.metadata

Use the data_type parameter to request a richer data type.

In [7]: richer_res = read('random_walk:dt', data_type='time')

In [8]: richer_res.metadata
Out[8]: DBR_TIME_DOUBLE(status=<AlarmStatus.NO_ALARM: 0>, severity=<AlarmSeverity.NO_ALARM: 0>, timestamp=1662135443.575255)

In [9]: richer_res.metadata.timestamp
Out[9]: 1662135443.575255

In [10]: richer_res.metadata.stamp.as_datetime()  # a convenience method
Out[10]: datetime.datetime(2022, 9, 2, 9, 17, 23, 575255)

See read() for more information on the values accepted by the data_type parameter.

Note

Performance Note

The underlying metadata and data are stored in efficient, contiguous-memory data structures.

In [11]: res.header  # a ctypes.BigEndianStructure
Out[11]: MessageHeader(command=15, payload_size=8, data_type=6, data_count=1, parameter1=1, parameter2=0)

In [12]: res.buffers  # a collection of one or more buffers
Out[12]: (b'', <memory at 0x1357de340>)

They were received directly from the socket into these structure with no intermediate copies. Accessing the res.data — which returns a numpy.ndarray or array.array — provides a view onto that same memory with no copying (if the data was received from the socket all at once) or one copy (if the data bridged multiple receipts).

Write

Let us set the value to 1.

In [13]: from caproto.sync.client import write

In [14]: write('random_walk:dt', 1)

The function returns None immediately. To wait for confirmation that the write has been successfully processed by the server, use the notify keyword argument. (This is not guaranteed to be supported by an EPICS server; it may result in an ErrorResponse.)

In [15]: from caproto.sync.client import write

In [16]: write('random_walk:dt', 1, notify=True)
Out[16]: WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0)

Subscribe (“Monitor”)

Let us now monitor a channel. The server updates the random_walk:x channel periodically at some period set by random_walk:dt. We can subscribe to updates and fan them out to one or more user-defined callback functions. First, we define a Subscription.

In [17]: from caproto.sync.client import subscribe

In [18]: sub = subscribe('random_walk:x')

Next, we define a callback function, a function that will be called whenever the server sends an update. It must accept one positional argument.

In [19]: responses = []

In [20]: def f(sub, response):
   ....:     """
   ....:     On each update, print the PV name and data
   ....:     Cache the full response in a list.
   ....:     """
   ....:     responses.append(response)
   ....:     print(sub.pv_name, response.data)
   ....: 

We register this function with sub. We can register multiple such functions is we wish.

In [21]: token = sub.add_callback(f)

The token is just an integer which we can use to remove f later.

Because this is a synchronous client, processing subscriptions is a blocking operation,. (See the Threading Client to process subscriptions on a separate, background thread.) To activate the subscription, call sub.block().

In [22]: sub.block()
random_walk:x [14.14272394]
random_walk:x [14.94322537]
random_walk:x [15.35695388]
random_walk:x [15.74301991]
^C

This call to sub.block() blocks indefinitely, sending a message to the server to request future updates and then passing its responses to f as they arrive. The server always sends at least one response immediately. When we are satisfied, we can interrupt it with Ctrl+C (or by calling sub.interrupt() from another thread). The subscription may later be reactivated by calling sub.block() again.

Recall that the user-defined function f printed the data from each response and accumulated the response objects in a list. Indeed, we have captured four responses.

In [23]: len(responses)
Out[23]: 4

At any point we can remove a specific callback function:

In [24]: sub.remove_callback(token)

or clear all the callbacks on a subscription:

In [25]: sub.clear()

To activate multiple subscriptions concurrently, use the top-level function block(), which accepts any number of Subscription objects as arguments. Again, use Ctrl+C to interrupt (or call interrupt() from another thread).

In [26]: from caproto.sync.client import block

In [27]: sub_dt = subscribe('random_walk:dt')

In [28]: sub_x = subscribe('random_walk:x')

In [29]: sub_dt.add_callback(f)
Out[29]: 0

In [30]: sub_x.add_callback(f)
Out[30]: 0

In [31]: block(sub_x, sub_dt)
random_walk:x [63.34866867]
random_walk:dt [1.]
random_walk:x [63.53448681]
random_walk:x [64.30532391]
^C

Changed in version 0.5.0: The expected signature of the callback function was changed from f(response) to f(sub, response). For backward compatibility, functions with signature f(response) are still accepted, but caproto will issue a warning that a future release may require the new signature, f(sub, response).

Warning

The callback registry in Subscription only holds weak references to the user callback functions. If there are no other references to the function, it will be silently garbage collected and removed. Therefore, constructions like this do not work:

sub.add_callback(lambda sub, response: print(response.data))

The lambda function will be promptly garbage collected by Python and removed from sub by caproto. To avoid that, make a reference before passing the function to Subscription.add_callback().

cb = lambda sub, response: print(response.data)
sub.add_callback(cb)

This can be surprising, but it is a standard approach for avoiding the accidental costly accumulation of abandoned callbacks.

Stateless Connection Handling

As noted at the top, the synchronous client is optimized for simplicity of implementation. While the other caproto clients use the notion of a context to cache connections, the synchronous client creates a fresh connection for each function call. This stateless design is sufficient to support the command-line interface and it can be useful for debugging, but it is very inefficient for performing multiple operations on the same channel.

For the common use case “read / write a new value / read again,” the synchronous client provides read_write_read(), which uses one connection for all three operations. For anything more complicated than that, upgrade to one of the other clients.

API Documentation

caproto.sync.client.read(pv_name, *, data_type=None, data_count=None, timeout=2.0, priority=0, notify=True, force_int_enums=False, repeater=True)[source]

Read a Channel.

Parameters
pv_namestr

The PV name to read from

data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.

data_countinteger, optional

Requested number of values. Default is the channel’s native data count.

timeoutfloat, optional

Default is 1 second.

priority0, optional

Virtual Circuit priority. Default is 0, lowest. Highest is 99.

notifyboolean, optional

Send a ReadNotifyRequest instead of a ReadRequest. True by default.

force_int_enumsboolean, optional

Retrieve enums as integers. (Default is strings.)

repeaterboolean, optional

Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.

Returns
responseReadResponse or ReadNotifyResponse

Examples

Get the value of a Channel named ‘simple:A’.

>>> read('simple:A').data
array([1], dtype=int32)

Request a richer Channel Access data type that includes the timestamp, and access the timestamp.

>>> read('cat', data_type='time').metadata.timestmap
1570622339.042392

A convenience method is provided for access the timestamp as a Python datetime object.

>>> read('cat' data_type='time').metadata.stamp.as_datetime()
datetime.datetime(2019, 10, 9, 11, 58, 59, 42392)

The requested data type may also been given as a specific Channel Access type

>>> from caproto import ChannelType
>>> read('cat', data_type=ChannelType.CTRL_FLOAT).metadata
DBR_CTRL_FLOAT(
    status=<AlarmStatus.NO_ALARM: 0>,
    severity=<AlarmSeverity.NO_ALARM: 0>,
    upper_disp_limit=0.0,
    lower_disp_limit=0.0,
    upper_alarm_limit=0.0,
    upper_warning_limit=0.0,
    lower_warning_limit=0.0,
    lower_alarm_limit=0.0,
    upper_ctrl_limit=0.0,
    lower_ctrl_limit=0.0,
    precision=0,
    units=b'')

or the corresponding integer identifer

>>> read('cat', data_type=30).metadata
DBR_CTRL_FLOAT(
status=<AlarmStatus.NO_ALARM: 0>,
severity=<AlarmSeverity.NO_ALARM: 0>,
upper_disp_limit=0.0,
lower_disp_limit=0.0,
upper_alarm_limit=0.0,
upper_warning_limit=0.0,
lower_warning_limit=0.0,
lower_alarm_limit=0.0,
upper_ctrl_limit=0.0,
lower_ctrl_limit=0.0,
precision=0,
units=b'')
caproto.sync.client.write(pv_name, data, *, notify=False, data_type=None, metadata=None, timeout=2.0, priority=0, repeater=True)[source]

Write to a Channel.

Parameters
pv_namestr

The PV name to write to

datastr, bytes, int, or float or any Iterable of these

Value(s) to write.

notifyboolean, optional

Request notification of completion and wait for it. False by default.

data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Write as specific data type. Default is inferred from input.

metadatactypes.BigEndianStructure or tuple

Status and control metadata for the values

timeoutfloat, optional

Default is 1 second.

priority0, optional

Virtual Circuit priority. Default is 0, lowest. Highest is 99.

repeaterboolean, optional

Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.

Returns
initial, finaltuple of ReadNotifyResponse objects

Examples

Write the value 5 to a Channel named ‘simple:A’.

>>> write('simple:A', 5)  # returns None

Request notification of completion (“put completion”) and wait for it. >>> write(‘cat’, 5, notify=True) # blocks until complete, then returns: WriteNotifyResponse( data_type=<ChannelType.LONG: 5>, data_count=1, status=CAStatusCode( name=’ECA_NORMAL’, code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description=’Normal successful completion’), ioid=0)

caproto.sync.client.subscribe(pv_name, priority=0, data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]

Define a subscription.

Parameters
pv_namestring

The PV name to subscribe to

priorityinteger, optional

Used by the server to triage subscription responses when under high load. 0 is lowest; 99 is highest.

data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.

data_countinteger, optional

Requested number of values. Default is the channel’s native data count, which can be checked in the Channel’s attribute native_data_count.

low, high, tofloat, optional

deprecated by Channel Access, not yet implemented by caproto

maskSubscriptionType, optional

Subscribe to selective updates.

Examples

Define a subscription on the random_walk:x PV.

>>> sub = subscribe('random_walk:x')

Add one or more user-defined callbacks to process responses.

>>> def f(sub, response):
...     print(repsonse.data)
...
>>> sub.add_callback(f)

Activate the subscription and process incoming responses.

>>> sub.block()

This is a blocking operation in the sync client. (To do this on a background thread, use the threading client.) Interrupt using Ctrl+C or by calling sub.interrupt() from another thread.

The subscription may be reactivated by calling sub.block() again.

To process multiple subscriptions at once, use the function block(), which takes one or more Subscriptions as arguments.

>>> block(sub1, sub2)

There is also an interrupt() function, which is merely an alias to the method.

caproto.sync.client.block(*subscriptions, duration=None, timeout=2.0, force_int_enums=False, repeater=True)[source]

Activate one or more subscriptions and process incoming responses.

Use Ctrl+C (SIGINT) to escape, or from another thread, call interrupt().

Parameters
*subscriptionsSubscriptions

The list of subscriptions.

durationfloat, optional

How many seconds to run for. Run forever (None) by default.

timeoutfloat, optional

Default is 1 second. This is not the same as for; this is the timeout for failure in the event of no connection.

force_int_enumsboolean, optional

Retrieve enums as integers. (Default is strings.)

repeaterboolean, optional

Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.

Examples

Activate subscription(s) and block while they process updates.

>>> sub1 = subscribe('cat')
>>> sub1 = subscribe('dog')
>>> block(sub1, sub2)
caproto.sync.client.interrupt()[source]

Signal to block() to stop blocking. Idempotent.

This obviously cannot be called interactively while blocked; it is intended to be called from another thread.

caproto.sync.client.read_write_read(pv_name, data, *, notify=False, read_data_type=None, write_data_type=None, metadata=None, timeout=2.0, priority=0, force_int_enums=False, repeater=True)[source]

Write to a Channel, but sandwich the write between to reads.

This is what the command-line utilities caproto-put and caput do. Notice that if you want the second reading to reflect the written value, you should pass the parameter notify=True. (This is also true of caproto-put/caput, which needs the -c argument to behave the way you might expect it to behave.)

This is provided as a separate function in order to support caproto-put efficiently. Making separate calls to read() and write() would re-create a connection redundantly.

Parameters
pv_namestr

The PV name to write/read/write

datastr, bytes, int, or float or any Iterable of these

Value to write.

notifyboolean, optional

Request notification of completion and wait for it. False by default.

read_data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Request specific data type.

write_data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Write as specific data type. Default is inferred from input.

metadatactypes.BigEndianStructure or tuple

Status and control metadata for the values

timeoutfloat, optional

Default is 1 second.

priority0, optional

Virtual Circuit priority. Default is 0, lowest. Highest is 99.

force_int_enumsboolean, optional

Retrieve enums as integers. (Default is strings.)

repeaterboolean, optional

Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.

Returns
initial, write_response, finaltuple of response
The middle response comes from the write, and it will be None unless
notify=True.

Examples

Write the value 5 to a Channel named ‘simple:A’.

>>> read_write_read('cat', 5)  # returns initial, None, final

Request notification of completion (“put completion”) and wait for it.

>>> read_write_read('cat', 5, notify=True)  # initial, WriteNotifyResponse, final
class caproto.sync.client.Subscription(pv_name, priority=0, data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]

This object encapsulates state related to a Subscription.

See the subscribe() function.

block(duration=None, timeout=1, force_int_enums=False, repeater=True)[source]

Activate one or more subscriptions and process incoming responses.

Use Ctrl+C (SIGINT) to escape, or from another thread, call interrupt().

Convenience alias for the top-level function block(), which may be used to process multiple Subscriptions concurrently.

Parameters
durationfloat, optional

How many seconds to run for. Run forever (None) by default.

timeoutfloat, optional

Default is 1 second. This is not the same as for; this is the timeout for failure in the event of no connection.

force_int_enumsboolean, optional

Retrieve enums as integers. (Default is strings.)

repeaterboolean, optional

Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.

interrupt()[source]

Signal to block() to stop blocking. Idempotent.

This obviously cannot be called interactively while blocked; it is intended to be called from another thread. This method is a convenience alias for the top-level function interrupt().

add_callback(func)[source]

Add a callback to receive responses.

Parameters
funccallable

Expected signature: func(sub, response).

The signature func(response) is also supported for backward-compatibility but will issue warnings. Support will be removed in a future release of caproto.

Returns
tokenint

Integer token that can be passed to remove_callback().

Changed in version 0.5.0: Changed the expected signature of func from func(response) to func(sub, response).

remove_callback(cb_id)[source]

Remove callback using token that was returned by add_callback().

process(response)[source]

Run the callbacks on a response.

This is used internally by block(), generally not called by the user.

clear()[source]

Remove all callbacks. If currently blocking, interrupt.