Synchronous Client¶
The synchronous client optimizes for simplicity of implementation over performance. This has its uses, as described below. But for high-performance applications, use one of caproto’s other clients such as the Threading Client.
Tutorial¶
In a separate shell, start one of caproto’s demo IOCs.
$ python3 -m caproto.ioc_examples.random_walk
PVs: ['random_walk:dt', 'random_walk:x']
Now, in Python we will talk to it using caproto’s synchronous client.
Read¶
In [1]: from caproto.sync.client import read
In [2]: res = read('random_walk:dt')
In [3]: res
Out[3]: ReadNotifyResponse(data=array([3.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)
This object is a human-friendly representation of the server’s response. The raw bytes of that response are:
In [4]: bytes(res)
Out[4]: b'\x00\x0f\x00\x08\x00\x06\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00@\x08\x00\x00\x00\x00\x00\x00'
Access particular fields in the response using attribute (“dot”) access on
res
.
In [5]: res.data
Out[5]: array([3.])
By default, the client does not request any metadata
In [6]: res.metadata
Use the data_type
parameter to request a richer data type.
In [7]: richer_res = read('random_walk:dt', data_type='time')
In [8]: richer_res.metadata
Out[8]: DBR_TIME_DOUBLE(status=<AlarmStatus.NO_ALARM: 0>, severity=<AlarmSeverity.NO_ALARM: 0>, timestamp=1684531901.994459)
In [9]: richer_res.metadata.timestamp
Out[9]: 1684531901.994459
In [10]: richer_res.metadata.stamp.as_datetime() # a convenience method
Out[10]: datetime.datetime(2023, 5, 19, 21, 31, 41, 994459)
See read()
for more information on the values accepted by the
data_type
parameter.
Note
Performance Note
The underlying metadata and data are stored in efficient, contiguous-memory data structures.
In [11]: res.header # a ctypes.BigEndianStructure
Out[11]: MessageHeader(command=15, payload_size=8, data_type=6, data_count=1, parameter1=1, parameter2=0)
In [12]: res.buffers # a collection of one or more buffers
Out[12]: (b'', <memory at 0x7f9c8420c400>)
They were received directly from the socket into these structure with no
intermediate copies. Accessing the res.data
— which returns a
numpy.ndarray
or array.array
— provides a view onto that same
memory with no copying (if the data was received from the socket all at
once) or one copy (if the data bridged multiple receipts).
Write¶
Let us set the value to 1
.
In [13]: from caproto.sync.client import write
In [14]: write('random_walk:dt', 1)
The function returns None
immediately. To wait for confirmation that the
write has been successfully processed by the server, use the notify
keyword
argument. (This is not guaranteed to be supported by an EPICS server; it may
result in an ErrorResponse
.)
In [15]: from caproto.sync.client import write
In [16]: write('random_walk:dt', 1, notify=True)
Out[16]: WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0)
Subscribe (“Monitor”)¶
Let us now monitor a channel. The server updates the random_walk:x
channel
periodically at some period set by random_walk:dt
. We can subscribe
to updates and fan them out to one or more user-defined callback functions.
First, we define a Subscription
.
In [17]: from caproto.sync.client import subscribe
In [18]: sub = subscribe('random_walk:x')
Next, we define a callback function, a function that will be called whenever the server sends an update. It must accept one positional argument.
In [19]: responses = []
In [20]: def f(sub, response):
....: """
....: On each update, print the PV name and data
....: Cache the full response in a list.
....: """
....: responses.append(response)
....: print(sub.pv_name, response.data)
....:
We register this function with sub
. We can register multiple such functions
is we wish.
In [21]: token = sub.add_callback(f)
The token
is just an integer which we can use to remove f
later.
Because this is a synchronous client, processing subscriptions is a blocking
operation,. (See the Threading Client to process subscriptions on a
separate, background thread.) To activate the subscription, call
sub.block()
.
In [22]: sub.block()
random_walk:x [14.14272394]
random_walk:x [14.94322537]
random_walk:x [15.35695388]
random_walk:x [15.74301991]
^C
This call to sub.block()
blocks indefinitely, sending a message to the
server to request future updates and then passing its responses to f
as
they arrive. The server always sends at least one response immediately. When we
are satisfied, we can interrupt it with Ctrl+C (or by calling
sub.interrupt()
from another thread). The subscription may later be
reactivated by calling sub.block()
again.
Recall that the user-defined function f
printed the data from each response
and accumulated the response objects in a list. Indeed, we have captured four
responses.
In [23]: len(responses)
Out[23]: 4
At any point we can remove a specific callback function:
In [24]: sub.remove_callback(token)
or clear all the callbacks on a subscription:
In [25]: sub.clear()
To activate multiple subscriptions concurrently, use the top-level function
block()
, which accepts any number of Subscription
objects as
arguments. Again, use Ctrl+C to interrupt (or call interrupt()
from
another thread).
In [26]: from caproto.sync.client import block
In [27]: sub_dt = subscribe('random_walk:dt')
In [28]: sub_x = subscribe('random_walk:x')
In [29]: sub_dt.add_callback(f)
Out[29]: 0
In [30]: sub_x.add_callback(f)
Out[30]: 0
In [31]: block(sub_x, sub_dt)
random_walk:x [63.34866867]
random_walk:dt [1.]
random_walk:x [63.53448681]
random_walk:x [64.30532391]
^C
Changed in version 0.5.0: The expected signature of the callback function was changed from
f(response)
to f(sub, response)
. For backward compatibility,
functions with signature f(response)
are still accepted, but caproto
will issue a warning that a future release may require the new signature,
f(sub, response)
.
Warning
The callback registry in Subscription
only holds weak references
to the user callback functions. If there are no other references to the
function, it will be silently garbage collected and removed. Therefore,
constructions like this do not work:
sub.add_callback(lambda sub, response: print(response.data))
The lambda function will be promptly garbage collected by Python and
removed from sub
by caproto. To avoid that, make a reference before
passing the function to Subscription.add_callback()
.
cb = lambda sub, response: print(response.data)
sub.add_callback(cb)
This can be surprising, but it is a standard approach for avoiding the accidental costly accumulation of abandoned callbacks.
Stateless Connection Handling¶
As noted at the top, the synchronous client is optimized for simplicity of implementation. While the other caproto clients use the notion of a context to cache connections, the synchronous client creates a fresh connection for each function call. This stateless design is sufficient to support the command-line interface and it can be useful for debugging, but it is very inefficient for performing multiple operations on the same channel.
For the common use case “read / write a new value / read again,” the
synchronous client provides read_write_read()
, which uses one connection
for all three operations. For anything more complicated than that, upgrade to
one of the other clients.
API Documentation¶
- caproto.sync.client.read(pv_name, *, data_type=None, data_count=None, timeout=2.0, priority=0, notify=True, force_int_enums=False, repeater=True)[source]¶
Read a Channel.
- Parameters:
- pv_namestr
The PV name to read from
- data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.
- data_countinteger, optional
Requested number of values. Default is the channel’s native data count.
- timeoutfloat, optional
Default is 1 second.
- priority0, optional
Virtual Circuit priority. Default is 0, lowest. Highest is 99.
- notifyboolean, optional
Send a ReadNotifyRequest instead of a ReadRequest. True by default.
- force_int_enumsboolean, optional
Retrieve enums as integers. (Default is strings.)
- repeaterboolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
- Returns:
- responseReadResponse or ReadNotifyResponse
Examples
Get the value of a Channel named ‘simple:A’.
>>> read('simple:A').data array([1], dtype=int32)
Request a richer Channel Access data type that includes the timestamp, and access the timestamp.
>>> read('cat', data_type='time').metadata.timestmap 1570622339.042392
A convenience method is provided for access the timestamp as a Python datetime object.
>>> read('cat' data_type='time').metadata.stamp.as_datetime() datetime.datetime(2019, 10, 9, 11, 58, 59, 42392)
The requested data type may also been given as a specific Channel Access type
>>> from caproto import ChannelType >>> read('cat', data_type=ChannelType.CTRL_FLOAT).metadata DBR_CTRL_FLOAT( status=<AlarmStatus.NO_ALARM: 0>, severity=<AlarmSeverity.NO_ALARM: 0>, upper_disp_limit=0.0, lower_disp_limit=0.0, upper_alarm_limit=0.0, upper_warning_limit=0.0, lower_warning_limit=0.0, lower_alarm_limit=0.0, upper_ctrl_limit=0.0, lower_ctrl_limit=0.0, precision=0, units=b'')
or the corresponding integer identifer
>>> read('cat', data_type=30).metadata DBR_CTRL_FLOAT( status=<AlarmStatus.NO_ALARM: 0>, severity=<AlarmSeverity.NO_ALARM: 0>, upper_disp_limit=0.0, lower_disp_limit=0.0, upper_alarm_limit=0.0, upper_warning_limit=0.0, lower_warning_limit=0.0, lower_alarm_limit=0.0, upper_ctrl_limit=0.0, lower_ctrl_limit=0.0, precision=0, units=b'')
- caproto.sync.client.write(pv_name, data, *, notify=False, data_type=None, metadata=None, timeout=2.0, priority=0, repeater=True)[source]¶
Write to a Channel.
- Parameters:
- pv_namestr
The PV name to write to
- datastr, bytes, int, or float or any Iterable of these
Value(s) to write.
- notifyboolean, optional
Request notification of completion and wait for it. False by default.
- data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Write as specific data type. Default is inferred from input.
- metadata
ctypes.BigEndianStructure
or tuple Status and control metadata for the values
- timeoutfloat, optional
Default is 1 second.
- priority0, optional
Virtual Circuit priority. Default is 0, lowest. Highest is 99.
- repeaterboolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
- Returns:
- initial, finaltuple of ReadNotifyResponse objects
Examples
Write the value 5 to a Channel named ‘simple:A’.
>>> write('simple:A', 5) # returns None
Request notification of completion (“put completion”) and wait for it. >>> write(‘cat’, 5, notify=True) # blocks until complete, then returns: WriteNotifyResponse( data_type=<ChannelType.LONG: 5>, data_count=1, status=CAStatusCode( name=’ECA_NORMAL’, code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description=’Normal successful completion’), ioid=0)
- caproto.sync.client.subscribe(pv_name, priority=0, data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]¶
Define a subscription.
- Parameters:
- pv_namestring
The PV name to subscribe to
- priorityinteger, optional
Used by the server to triage subscription responses when under high load. 0 is lowest; 99 is highest.
- data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.
- data_countinteger, optional
Requested number of values. Default is the channel’s native data count, which can be checked in the Channel’s attribute
native_data_count
.- low, high, tofloat, optional
deprecated by Channel Access, not yet implemented by caproto
- maskSubscriptionType, optional
Subscribe to selective updates.
Examples
Define a subscription on the
random_walk:x
PV.>>> sub = subscribe('random_walk:x')
Add one or more user-defined callbacks to process responses.
>>> def f(sub, response): ... print(repsonse.data) ... >>> sub.add_callback(f)
Activate the subscription and process incoming responses.
>>> sub.block()
This is a blocking operation in the sync client. (To do this on a background thread, use the threading client.) Interrupt using Ctrl+C or by calling
sub.interrupt()
from another thread.The subscription may be reactivated by calling
sub.block()
again.To process multiple subscriptions at once, use the function
block()
, which takes one or more Subscriptions as arguments.>>> block(sub1, sub2)
There is also an
interrupt()
function, which is merely an alias to the method.
- caproto.sync.client.block(*subscriptions, duration=None, timeout=2.0, force_int_enums=False, repeater=True)[source]¶
Activate one or more subscriptions and process incoming responses.
Use Ctrl+C (SIGINT) to escape, or from another thread, call
interrupt()
.- Parameters:
- *subscriptionsSubscriptions
The list of subscriptions.
- durationfloat, optional
How many seconds to run for. Run forever (None) by default.
- timeoutfloat, optional
Default is 1 second. This is not the same as for; this is the timeout for failure in the event of no connection.
- force_int_enumsboolean, optional
Retrieve enums as integers. (Default is strings.)
- repeaterboolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
Examples
Activate subscription(s) and block while they process updates.
>>> sub1 = subscribe('cat') >>> sub1 = subscribe('dog') >>> block(sub1, sub2)
- caproto.sync.client.interrupt()[source]¶
Signal to
block()
to stop blocking. Idempotent.This obviously cannot be called interactively while blocked; it is intended to be called from another thread.
- caproto.sync.client.read_write_read(pv_name, data, *, notify=False, read_data_type=None, write_data_type=None, metadata=None, timeout=2.0, priority=0, force_int_enums=False, repeater=True)[source]¶
Write to a Channel, but sandwich the write between to reads.
This is what the command-line utilities
caproto-put
andcaput
do. Notice that if you want the second reading to reflect the written value, you should pass the parameternotify=True
. (This is also true ofcaproto-put
/caput
, which needs the-c
argument to behave the way you might expect it to behave.)This is provided as a separate function in order to support
caproto-put
efficiently. Making separate calls toread()
andwrite()
would re-create a connection redundantly.- Parameters:
- pv_namestr
The PV name to write/read/write
- datastr, bytes, int, or float or any Iterable of these
Value to write.
- notifyboolean, optional
Request notification of completion and wait for it. False by default.
- read_data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type.
- write_data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Write as specific data type. Default is inferred from input.
- metadata
ctypes.BigEndianStructure
or tuple Status and control metadata for the values
- timeoutfloat, optional
Default is 1 second.
- priority0, optional
Virtual Circuit priority. Default is 0, lowest. Highest is 99.
- force_int_enumsboolean, optional
Retrieve enums as integers. (Default is strings.)
- repeaterboolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
- Returns:
- initial, write_response, finaltuple of response
- The middle response comes from the write, and it will be
None
unless notify=True
.
Examples
Write the value 5 to a Channel named ‘simple:A’.
>>> read_write_read('cat', 5) # returns initial, None, final
Request notification of completion (“put completion”) and wait for it.
>>> read_write_read('cat', 5, notify=True) # initial, WriteNotifyResponse, final
- class caproto.sync.client.Subscription(pv_name, priority=0, data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]¶
This object encapsulates state related to a Subscription.
See the
subscribe()
function.- block(duration=None, timeout=1, force_int_enums=False, repeater=True)[source]¶
Activate one or more subscriptions and process incoming responses.
Use Ctrl+C (SIGINT) to escape, or from another thread, call
interrupt()
.Convenience alias for the top-level function
block()
, which may be used to process multiple Subscriptions concurrently.- Parameters:
- durationfloat, optional
How many seconds to run for. Run forever (None) by default.
- timeoutfloat, optional
Default is 1 second. This is not the same as for; this is the timeout for failure in the event of no connection.
- force_int_enumsboolean, optional
Retrieve enums as integers. (Default is strings.)
- repeaterboolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
- interrupt()[source]¶
Signal to block() to stop blocking. Idempotent.
This obviously cannot be called interactively while blocked; it is intended to be called from another thread. This method is a convenience alias for the top-level function
interrupt()
.
- add_callback(func)[source]¶
Add a callback to receive responses.
- Parameters:
- funccallable
Expected signature:
func(sub, response)
.The signature
func(response)
is also supported for backward-compatibility but will issue warnings. Support will be removed in a future release of caproto.
- Returns:
- tokenint
Integer token that can be passed to
remove_callback()
.
Changed in version 0.5.0: Changed the expected signature of
func
fromfunc(response)
tofunc(sub, response)
.
- remove_callback(cb_id)[source]¶
Remove callback using token that was returned by
add_callback()
.