Synchronous Client¶
The synchronous client optimizes for simplicity of implementation over performance. This has its uses, as described below. But for high-performance applications, use one of caproto’s other clients such as the Threading Client.
Tutorial¶
In a separate shell, start one of caproto’s demo IOCs.
$ python3 -m caproto.ioc_examples.random_walk
PVs: ['random_walk:dt', 'random_walk:x']
Now, in Python we will talk to it using caproto’s synchronous client.
Read¶
In [1]: from caproto.sync.client import read
In [2]: res = read('random_walk:dt')
In [3]: res
Out[3]: ReadNotifyResponse(data=array([1.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)
This object is a human-friendly representation of the server’s response. The raw bytes of that response are:
In [4]: bytes(res)
Out[4]: b'\x00\x0f\x00\x08\x00\x06\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00?\xf0\x00\x00\x00\x00\x00\x00'
Access particular fields in the response using attribute (“dot”) access on
res
.
In [5]: res.data
Out[5]: array([1.])
Note
Performance Note
The underlying metadata and data are stored in efficient, contiguous-memory data structures.
In [6]: res.header # a ctypes.BigEndianStructure Out[6]: MessageHeader(command=15, payload_size=8, data_type=6, data_count=1, parameter1=1, parameter2=0) In [7]: res.buffers # a collection of one or more buffers Out[7]: (b'', <memory at 0x7f80e657ec48>)
They were received directly from the socket into these structure with no
intermediate copies. Accessing the res.data
— which returns a
numpy.ndarray
or array.array
— provides a view onto that same
memory with no copying (if the data was received from the socket all at
once) or one copy (if the data bridged multiple receipts).
Write¶
Let us set the value to 1
.
In [8]: from caproto.sync.client import write
In [9]: write('random_walk:dt', 1)
The function returns None
immediately. To wait for confirmation that the
write has been successfully processed by the server, use the notify
keyword
argument. (This is not guaranteed to be supported by an EPICS server; it may
result in an ErrorResponse
.)
In [10]: from caproto.sync.client import write
In [11]: write('random_walk:dt', 1, notify=True)
Out[11]: WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0)
Subscribe (“Monitor”)¶
Let us now monitor a channel. The server updates the random_walk:x
channel
periodically at some period set by random_walk:dt
. We can subscribe
to updates and fan them out to one or more user-defined callback functions.
First, we define a Subscription
.
In [12]: from caproto.sync.client import subscribe
In [13]: sub = subscribe('random_walk:x')
Next, we define a callback function, a function that will be called whenever the server sends an update. It must accept one positional argument.
In [14]: responses = []
In [15]: def f(response):
....: "On each update, print the data and cache the full response in a list."
....: responses.append(response)
....: print(response.data)
....:
We register this function with sub
. We can register multiple such functions
is we wish.
In [16]: token = sub.add_callback(f)
The token
is just an integer which we can use to remove f
later.
Because this is a synchronous client, processing subscriptions is a blocking
operation,. (See the Threading Client to process subscriptions on a
separate, background thread.) To activate the subscription, call
sub.block()
.
In [17]: sub.block()
[14.14272394]
[14.94322537]
[15.35695388]
[15.74301991]
^C
This call to sub.block()
blocks indefinitely, sending a message to the
server to request future updates and then passing its responses to f
as
they arrive. The server always sends at least one response immediately. When we
are satisfied, we can interrupt it with Ctrl+C (or by calling
sub.interrupt()
from another thread). The subscription may later be
reactivated by calling sub.block()
again.
Recall that the user-defined function f
printed the data from each response
and accumulated the response objects in a list. Indeed, we have captured four
responses.
In [18]: len(responses)
Out[18]: 4
At any point we can remove a specific callback function:
In [19]: sub.remove_callback(token)
or clear all the callbacks on a subscription:
In [20]: sub.clear()
To activate multiple subscriptions concurrently, use the top-level function
block()
, which accepts any number of Subscription
objects as
arguments. Again, use Ctrl+C to interrupt (or call interrupt()
from
another thread).
In [21]: from caproto.sync.client import block
In [22]: sub_dt = subscribe('random_walk:dt')
In [23]: sub_x = subscribe('random_walk:x')
In [24]: sub_dt.add_callback(f)
Out[24]: 0
In [25]: sub_x.add_callback(f)
Out[25]: 0
In [26]: block(sub_x, sub_dt)
[63.34866867]
[1.]
[63.53448681]
[64.30532391]
^C
Warning
The callback registry in Subscription
only holds weak references
to the user callback functions. If there are no other references to the
function, it will be silently garbage collected and removed. Therefore,
constructions like this do not work:
sub.add_callback(lambda response: print(response.data))
The lambda function will be promptly garbage collected by Python and
removed from sub
by caproto. To avoid that, make a reference before
passing the function to Subscription.add_callback()
.
cb = lambda response: print(response.data)
sub.add_callback(cb)
This can be surprising, but it is a standard approach for avoiding the accidental costly accumulation of abandoned callbacks.
Stateless Connection Handling¶
As noted at the top, the synchronous client is optimized for simplicity of implementation. While the other caproto clients use the notion of a context to cache connections, the synchronous client creates a fresh connection for each function call. This stateless design is sufficient to support the command-line interface and it can be useful for debugging, but it is very inefficient for performing multiple operations on the same channel.
For the common use case “read / write a new value / read again,” the
synchronous client provides read_write_read()
, which uses one connection
for all three operations. For anything more complicated than that, upgrade to
one of the other clients.
API Documentation¶
-
caproto.sync.client.
read
(pv_name, *, data_type=None, timeout=1, priority=0, notify=True, force_int_enums=False, repeater=True)[source]¶ Read a Channel.
Parameters: - pv_name : str
- data_type : {‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.
- timeout : float, optional
Default is 1 second.
- priority : 0, optional
Virtual Circuit priority. Default is 0, lowest. Highest is 99.
- notify : boolean, optional
Send a ReadNotifyRequest instead of a ReadRequest. True by default.
- force_int_enums : boolean, optional
Retrieve enums as integers. (Default is strings.)
- repeater : boolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
Returns: - response : ReadResponse or ReadNotifyResponse
Examples
Get the value of a Channel named ‘cat’.
>>> read('cat').data
-
caproto.sync.client.
write
(pv_name, data, *, notify=False, data_type=None, metadata=None, timeout=1, priority=0, repeater=True)[source]¶ Write to a Channel.
Parameters: - pv_name : str
- data : str, int, or float or any Iterable of these
Value(s) to write.
- notify : boolean, optional
Request notification of completion and wait for it. False by default.
- data_type : {‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Write as specific data type. Default is inferred from input.
- metadata :
ctypes.BigEndianStructure
or tuple Status and control metadata for the values
- timeout : float, optional
Default is 1 second.
- priority : 0, optional
Virtual Circuit priority. Default is 0, lowest. Highest is 99.
- repeater : boolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
Returns: - initial, final : tuple of ReadNotifyResponse objects
Examples
Write the value 5 to a Channel named ‘cat’.
>>> write('cat', 5) # returns None
Request notification of completion (“put completion”) and wait for it. >>> write(‘cat’, 5, notify=True) # returns a WriteNotifyResponse
-
caproto.sync.client.
subscribe
(pv_name, priority=0, data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]¶ Define a subscription.
Parameters: - pv_name : string
- priority : integer, optional
Used by the server to triage subscription responses when under high load. 0 is lowest; 99 is highest.
- data_type : {‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.
- data_count : integer, optional
Requested number of values. Default is the channel’s native data count, which can be checked in the Channel’s attribute
native_data_count
.- low, high, to : float, optional
deprecated by Channel Access, not yet implemented by caproto
- mask : SubscriptionType, optional
Subscribe to selective updates.
Examples
Define a subscription on the
cat
PV.>>> sub = subscribe('cat')
Add one or more user-defined callbacks to process responses.
>>> def f(response): ... print(repsonse.data) ... >>> sub.add_callback(f)
Activate the subscription and process incoming responses.
>>> sub.block()
This is a blocking operation in the sync client. (To do this on a background thread, use the threading client.) Interrupt using Ctrl+C or by calling
sub.interrupt()
from another thread.The subscription may be reactivated by calling
sub.block()
again.To process multiple subscriptions at once, use the function
block()
, which takes one or more Subscriptions as arguments.>>> block(sub1, sub2)
There is also an
interrupt()
function, which is merely an alias to the method.
-
caproto.sync.client.
block
(*subscriptions, duration=None, timeout=1, force_int_enums=False, repeater=True)[source]¶ Activate one or more subscriptions and process incoming responses.
Use Ctrl+C (SIGINT) to escape, or from another thread, call
interrupt()
.Parameters: - *subscriptions : Subscriptions
- duration : float, optional
How many seconds to run for. Run forever (None) by default.
- timeout : float, optional
Default is 1 second. This is not the same as for; this is the timeout for failure in the event of no connection.
- force_int_enums : boolean, optional
Retrieve enums as integers. (Default is strings.)
- repeater : boolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
Examples
Activate subscription(s) and block while they process updates.
>>> sub1 = subscribe('cat') >>> sub1 = subscribe('dog') >>> block(sub1, sub2)
-
caproto.sync.client.
interrupt
()[source]¶ Signal to
block()
to stop blocking. Idempotent.This obviously cannot be called interactively while blocked; it is intended to be called from another thread.
-
caproto.sync.client.
read_write_read
(pv_name, data, *, notify=False, read_data_type=None, write_data_type=None, metadata=None, timeout=1, priority=0, force_int_enums=False, repeater=True)[source]¶ Write to a Channel, but sandwich the write between to reads.
This is what the command-line utilities
caproto-put
andcaput
do. Notice that if you want the second reading to reflect the written value, you should pass the parameternotify=True
. (This is also true ofcaproto-put
/caput
, which needs the-c
argument to behave the way you might expect it to behave.)This is provided as a separate function in order to support
caproto-put
efficiently. Making separate calls toread()
andwrite()
would re-create a connection redundantly.Parameters: - pv_name : str
- data : str, int, or float or a list of these
Value to write.
- notify : boolean, optional
Request notification of completion and wait for it. False by default.
- read_data_type : {‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type.
- write_data_type : {‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Write as specific data type. Default is inferred from input.
- metadata :
ctypes.BigEndianStructure
or tuple Status and control metadata for the values
- timeout : float, optional
Default is 1 second.
- priority : 0, optional
Virtual Circuit priority. Default is 0, lowest. Highest is 99.
- force_int_enums : boolean, optional
Retrieve enums as integers. (Default is strings.)
- repeater : boolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
Returns: - initial, write_response, final : tuple of response
- The middle response comes from the write, and it will be
None
unless notify=True
.
Examples
Write the value 5 to a Channel named ‘cat’.
>>> read_write_read('cat', 5) # returns initial, None, final
Request notification of completion (“put completion”) and wait for it.
>>> read_write_read('cat', 5, notify=True) # initial, WriteNotifyResponse, final
-
class
caproto.sync.client.
Subscription
(pv_name, priority=0, data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]¶ This object encapsulates state related to a Subscription.
See the
subscribe()
function.-
add_callback
(func)[source]¶ Add a callback to receive responses.
Parameters: - func : callable
Expected signature:
func(response)
Returns: - token : int
Integer token that can be passed to
remove_callback()
.
-
block
(duration=None, timeout=1, force_int_enums=False, repeater=True)[source]¶ Activate one or more subscriptions and process incoming responses.
Use Ctrl+C (SIGINT) to escape, or from another thread, call
interrupt()
.Convenience alias for the top-level function
block()
, which may be used to process multiple Subscriptions concurrently.Parameters: - duration : float, optional
How many seconds to run for. Run forever (None) by default.
- timeout : float, optional
Default is 1 second. This is not the same as for; this is the timeout for failure in the event of no connection.
- force_int_enums : boolean, optional
Retrieve enums as integers. (Default is strings.)
- repeater : boolean, optional
Spawn a Channel Access Repeater process if the port is available. True default, as the Channel Access spec stipulates that well-behaved clients should do this.
-
interrupt
()[source]¶ Signal to block() to stop blocking. Idempotent.
This obviously cannot be called interactively while blocked; it is intended to be called from another thread. This method is a convenience alias for the top-level function
interrupt()
.
-
process
(response)[source]¶ Run the callbacks on a response.
This is used internally by
block()
, generally not called by the user.
-
remove_callback
(cb_id)[source]¶ Remove callback using token that was returned by
add_callback()
.
-