Asynchronous Clients¶
Developers interested in exploring the prototype asyncio client can poke around
the module caproto.asyncio.client
. The design is analogous to the
threading client but using asyncio.
While this asyncio client is full-featured as compared to the threading client, it has had minimal testing, usage, or review.
With that in mind, please let us know if it works for you, or you have feedback for us.
For those that are looking for information about curio or trio clients, their prototype implementations have been removed.
Tutorial¶
In a separate shell, start one of caproto’s demo IOCs.
$ python3 -m caproto.ioc_examples.random_walk --list-pvs
[I 10:37:48.827 server: 133] Asyncio server starting up...
[I 10:37:48.827 server: 146] Listening on 0.0.0.0:65442
[I 10:37:48.829 server: 205] Server startup complete.
[I 10:37:48.829 server: 207] PVs available:
random_walk:dt
random_walk:x
Connect¶
Note
To do this interactively, you will need a newer version of IPython >= 7
that supports %autoawait
.
In Python we will talk to it using caproto’s asyncio client. Start by enabling
%autoawait
.
In [1]: %autoawait on
Now, create a Context
. A requirement for the asyncio client is that
the context must be created within the asyncio loop that will be using it, thus
it must be created in a coroutine.
In [2]: from caproto.asyncio.client import Context
In [3]: async def new_context():
...: return Context()
...:
In [4]: ctx = await new_context()
The Context
object caches connections, manages automatic
re-connection, and tracks the state of connections in progress. We can use it
to request new connections. Formulating requests for many PVs in a large batch
is efficient. In this example we’ll just ask for two PVs.
In [5]: x, dt = await ctx.get_pvs('random_walk:x', 'random_walk:dt')
Context.get_pvs()
accepts an arbitrary number of PV names and
immediately returns a collection of PV
objects representing each name.
In a background task, the Context searches for an EPICS server that provides
that PV name and then connects to it. The PV object displays its connection
state:
In [6]: dt
Out[6]: <PV name='random_walk:dt' priority=0 (searching....)>
The Context displays aggregate information about the state of all connections.
In [7]: ctx
Out[7]: <Context circuits=0 pvs=2 idle=0>
Read¶
Now, to read a PV:
In [8]: res = await dt.read()
In [9]: res
Out[9]: ReadNotifyResponse(data=array([3.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)
This object is a human-friendly representation of the server’s response. The raw bytes of that response are:
In [10]: bytes(res)
Out[10]: b'\x00\x0f\x00\x08\x00\x06\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00@\x08\x00\x00\x00\x00\x00\x00'
Access particular fields in the response using attribute (“dot”) access on res
.
In [11]: res.data
Out[11]: array([3.])
By default, the client does not request any metadata
In [12]: res.metadata
Use the data_type
parameter to request a richer data type.
In [13]: richer_res = await dt.read(data_type='time')
In [14]: richer_res.metadata
Out[14]: DBR_TIME_DOUBLE(status=<AlarmStatus.NO_ALARM: 0>, severity=<AlarmSeverity.NO_ALARM: 0>, timestamp=1735004708.021449)
In [15]: richer_res.metadata.timestamp
Out[15]: 1735004708.021449
In [16]: richer_res.metadata.stamp.as_datetime() # a convenience method
Out[16]: datetime.datetime(2024, 12, 24, 1, 45, 8, 21449)
See PV.read()
for more information on the values accepted by the
data_type
parameter.
Note
Performance Note
The underlying metadata and data are stored in efficient, contiguous-memory data structures.
In [17]: res.header # a ctypes.BigEndianStructure
Out[17]: MessageHeader(command=15, payload_size=8, data_type=6, data_count=1, parameter1=1, parameter2=0)
In [18]: res.buffers # a collection of one or more buffers
Out[18]: (b'', <memory at 0x7f6076a8f280>)
They were received directly from the socket into these structure with no
intermediate copies. Accessing the res.data
— which returns a
numpy.ndarray
or array.array
— provides a view onto that same
memory with no copying (if the data was received from the socket all at
once) or one copy (if the data bridged multiple receipts).
Write¶
Let us set the value to 1
.
In [19]: await dt.write([1])
Out[19]: WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=2)
By default, we send WriteNotifyResponse
, wait for a response, and return
it. There are a couple other ways we can handle writes:
Return immediately, not asking for or waiting for a response.
await dt.write([1], wait=False)
Return immediately, not waiting for a response, but handing the response (when it arrived) to some callback function, processed in the background. Coroutines and regular functions are supported as callbacks.
async def f(response): print('got a response:', response) await dt.write([1], wait=False, callback=f)
See the PV.write()
for more.
Subscribe (“Monitor”)¶
Let us now monitor a channel. The server updates the random_walk:x
channel
periodically at some period set by random_walk:dt
. We can subscribe to
updates and fan them out to one or more user-defined callback functions.
First, we define a Subscription
.
In [20]: sub = x.subscribe()
Next, we define a callback function, a function that will be called whenever the server sends an update. It must accept two positional arguments.
In [21]: responses = []
In [22]: def f(sub, response):
....: print('Received response from', sub.pv.name)
....: responses.append(response)
....:
This user-defined function f
has access to the full response from the
server, which includes data and any metadata. The server’s response does not
include the name of the PV involved (it identifies it indirectly via a
“subscription ID” code) so caproto provides the function with sub
as well,
from which you can obtain the pertinent PV sub.pv
and its name
sub.pv.name
as illustrated above. This is useful for distinguishing
responses when the same function is added to multiple subscriptions.
We register this function with sub
.
In [23]: token = sub.add_callback(f)
The token
is just an integer which we can use to remove f
later. We can
define a second callback:
In [24]: values = []
In [25]: def g(sub, response):
....: values.append(response.data[0])
....:
and add it to the same subscription, putting no additional load on the network.
In [26]: sub.add_callback(g)
Out[26]: 1
After some time has passed, we will have accumulated some responses. The only caveat here is that asyncio tasks will not be running in the background until we start interacting with it. Let’s sleep for a few seconds - letting asyncio take control - and see what we get.
In [27]: import asyncio
In [28]: await asyncio.sleep(5)
Received response from random_walk:x
Received response from random_walk:x
Received response from random_walk:x
Received response from random_walk:x
In [29]: len(responses)
Out[29]: 4
In [30]: values
Out[30]:
[-0.7087522063132399,
-0.28229554747192576,
-1.084921773305793,
-0.22329044201018866]
An alternative syntax is a bit more convenient here:
In [31]: async for value in sub:
....: print('Received value', value)
....: break
....:
Received value EventAddResponse(data=array([-0.22329044]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), subscriptionid=0, metadata=None)
At any point we can remove a specific callback function:
In [32]: await sub.remove_callback(token)
or clear all the callbacks on a subscription:
In [33]: await sub.clear()
In order to minimize load on the network, a Subscription
waits to
request updates from the server until the first user callback is added. Thus,
the first callback added by the user is guaranteed to get the first response
received from the server. If all user callbacks are later removed, either
explicitly (via remove_callback
or clear
) or implicitly via Python
garbage collection, the Subscription automatically cancels future updates from
the server. If a callback is then later added, the Subscription silently
re-initiates updates. All of this is transparent to the user.
Warning
The callback registry in Subscription
only holds weak references
to the user callback functions. If there are no other references to the
function, it will be silently garbage collected and removed. Therefore,
constructions like this do not work:
sub.add_callback(lambda sub, response: print(response.data))
The lambda function will be promptly garbage collected by Python and
removed from sub
by caproto. To avoid that, make a reference before
passing the function to Subscription.add_callback()
.
cb = lambda sub, response: print(response.data)
sub.add_callback(cb)
This can be surprising, but it is a standard approach for avoiding the accidental costly accumulation of abandoned callbacks.
This pitfall does not apply to callbacks passed to PV.read()
and
PV.write()
because those are single-shot callbacks that do not
persist beyond their first use.
Go Idle¶
Once created, PVs are cached for the lifetime of the Context
and
returned again to the user if a PV with the same name and priority is
requested. In order to reduce the load on the network, a PV can be temporarily
made “idle” (disconnected). It will silently, automatically reconnect the next
time it is used.
In [34]: x
Out[34]: <PV name='random_walk:x' priority=0 address=('10.1.0.22', 5064), circuit_state=States.CONNECTED, channel_state=States.CONNECTED>
In [35]: await x.go_idle()
In [36]: x
Out[36]: <PV name='random_walk:x' priority=0 (idle)>
In [37]: await x.read()
Out[37]: ReadNotifyResponse(data=array([-0.22329044]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=3, metadata=None)
In [38]: x
Out[38]: <PV name='random_walk:x' priority=0 address=('10.1.0.22', 5064), circuit_state=States.CONNECTED, channel_state=States.CONNECTED>
Notice that when the PV was read it automatically reconnected, requiring no action from the user.
The go_idle()
method is merely a request and is not guaranteed to have
any effect. If a PV has active subscriptions, it will ignore the request: it
must stay active to continue servicing user callbacks. Therefore, it is safe
call go_idle()
on any PV at any time, knowing that the PV will decline to
disconnect if it is being actively used and that, if it is currently unused, it
will transparently reconnect the next time it is used.
Canceling Searches¶
All unanswered searches are retried repeatedly, with decreasing frequency,
forever. Each new call to get_pvs()
causes all unanswered
searches to be retried at least once immediately. This can generate unwanted
network traffic. To fully cancel a search that is never expected to complete,
access the method SharedBroadcaster.cancel
.
ctx.broadcaster.cancel('some typo-ed PV name, for example')
As the name suggests, it is possible to construct multiple Contexts that share one SharedBroadcaster. In that scenario, notice that canceling the search will affect all contexts using the SharedBroadcaster.
Events Off and On¶
If a given circuit produces updates faster than a client can process them, the client can suspend subscriptions on that circuit. This will causes the server to discard all backlogged updates and all new updates during the period of supsension. When the client reactives subscriptions, it will immediate receive the most recent update and then any future updates.
await x.circuit_manager.events_off()
...
await x.circuit_manager.events_on()
Server Health Check¶
To check how much time has passed (in seconds) since each known server was last heard from, use:
ctx.broadcaster.time_since_last_heard()
As a convenience, check on the server connected to a specific PV using:
x.time_since_last_heard()
See the SharedBroadcaster.time_since_last_heard()
API documentation below
for details.
Logs for Debugging¶
Caproto uses Python’s logging framework, which enables sophisticated log management. For more information and copy/paste-able examples, see Logging.
API Documentation¶
- class caproto.asyncio.client.Context(broadcaster=None, *, timeout=2.0, host_name=None, client_name=None, max_workers=1)[source]¶
Encapsulates the state and connections of a client.
- Parameters:
- broadcasterSharedBroadcaster, optional
If None is specified, a fresh one is instantiated.
- timeoutnumber or None, optional
Number of seconds before a CaprotoTimeoutError is raised. This default can be overridden at the PV level or for any given operation. If unset, the default is 2 seconds. If None, never timeout. A global timeout can be specified via an environment variable
CAPROTO_DEFAULT_TIMEOUT
.- host_namestring, optional
uses value of
socket.gethostname()
by default- client_namestring, optional
uses value of
getpass.getuser()
by default- max_workersinteger, optional
Number of worker threaders per VirtualCircuit for executing user callbacks. Default is 1. For any number of workers, workers will receive updates in the order which they are received from the server. That is, work on each update will begin in sequential order. Work-scheduling internal to the user callback is outside caproto’s control. If the number of workers is set to greater than 1, the work on each update may not finish in a deterministic order. For example, if workers are writing lines into a file, the only way to guarantee that the lines are ordered properly is to use only one worker. If ordering matters for your application, think carefully before increasing this value from 1.
- async get_pvs(*names, priority=0, connection_state_callback=None, access_rights_callback=None, timeout=CONTEXT_DEFAULT_TIMEOUT)[source]¶
Return a list of PV objects.
These objects may not be connected at first. Channel creation occurs on a background thread.
PVs are uniquely defined by their name and priority. If a PV with the same name and priority is requested twice, the same (cached) object is returned. Any callbacks included here are added to added alongside any existing ones.
- Parameters:
- *namesstrings
any number of PV names
- priorityinteger
Used by the server to triage subscription responses when under high load. 0 is lowest; 99 is highest.
- connection_state_callbackcallable
Expected signature:
f(pv, state)
wherepv
is the instance ofPV
whose state has changed andstate
is a string- access_rights_callbackcallable
Expected signature:
f(pv, access_rights)
wherepv
is the instance ofPV
whose state has changed andaccess_rights
is a member of the caprotoAccessRights
enum- timeoutnumber or None, optional
Number of seconds before a CaprotoTimeoutError is raised. This default can be overridden for any specific operation. By default, fall back to the default timeout set by the Context. If None, never timeout.
- class caproto.asyncio.client.PV(name, priority, context, connection_state_callback, access_rights_callback, timeout)[source]¶
These must be instantiated by a Context, never directly.
- property timeout¶
Effective default timeout.
Valid values are: * CONTEXT_DEFAULT_TIMEOUT (fall back to Context.timeout) * a floating-point number * None (never timeout)
- property circuit_manager¶
- property channel¶
- property connected¶
- async wait_for_search(*, timeout=PV_DEFAULT_TIMEOUT)[source]¶
Wait for this PV to be found.
This does not wait for the PV’s Channel to be created; it merely waits for an address (and a VirtualCircuit) to be assigned.
- Parameters:
- timeoutnumber or None, optional
Seconds to wait before a CaprotoTimeoutError is raised. Default is
PV.timeout
, which falls back to Context.timeout if not set. If None, never timeout.
- async wait_for_connection(*, timeout=PV_DEFAULT_TIMEOUT)[source]¶
Wait for this PV to be connected.
- Parameters:
- timeoutnumber or None, optional
Seconds to wait before a CaprotoTimeoutError is raised. Default is
PV.timeout
, which falls back toPV.context.timeout
if not set. If None, never timeout.
- async go_idle()[source]¶
Request to clear this Channel to reduce load on client and server.
A new Channel will be automatically, silently created the next time any method requiring a connection is called. Thus, this saves some memory in exchange for making the next request a bit slower, as it has to redo the handshake with the server first.
If there are any subscriptions with callbacks, this request will be ignored. If the PV is in the process of connecting, this request will be ignored. If there are any actions in progress (read, write) this request will be processed when they are complete.
- async read(*, wait=True, callback=None, timeout=PV_DEFAULT_TIMEOUT, data_type=None, data_count=None, notify=True)[source]¶
Request a fresh reading.
Can do one or both of: - Block while waiting for the response, and return it. - Pass the response to callback, with or without blocking.
- Parameters:
- waitboolean
If True (default) block until a matching response is received from the server. Raises CaprotoTimeoutError if that response is not received within the time specified by the timeout parameter.
- callbackcallable or None
Called with the response as its argument when received.
- timeoutnumber or None, optional
Seconds to wait before a CaprotoTimeoutError is raised. Default is
PV.timeout
, which falls back toPV.context.timeout
if not set. If None, never timeout.- data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.
- data_countinteger, optional
Requested number of values. Default is the channel’s native data count.
- notify: boolean, optional
Send a
ReadNotifyRequest
instead of aReadRequest
. True by default.
- async write(data, *, wait=True, callback=None, timeout=PV_DEFAULT_TIMEOUT, notify=None, data_type=None, data_count=None)[source]¶
Write a new value. Optionally, request confirmation from the server.
Can do one or both of: - Block while waiting for the response, and return it. - Pass the response to callback, with or without blocking.
- Parameters:
- datastr, int, or float or any Iterable of these
Value(s) to write.
- waitboolean
If True (default) block until a matching WriteNotifyResponse is received from the server. Raises CaprotoTimeoutError if that response is not received within the time specified by the timeout parameter.
- callbackcallable or None
Called with the WriteNotifyResponse as its argument when received.
- timeoutnumber or None, optional
Seconds to wait before a CaprotoTimeoutError is raised. Default is
PV.timeout
, which falls back toPV.context.timeout
if not set. If None, never timeout.- notifyboolean or None
If None (default), set to True if wait=True or callback is set. Can be manually set to True or False. Will raise ValueError if set to False while wait=True or callback is set.
- data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Write specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.
- data_countinteger, optional
Requested number of values. Default is the channel’s native data count.
- subscribe(data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]¶
Start a new subscription to which user callback may be added.
- Parameters:
- data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional
Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.
- data_countinteger, optional
Requested number of values. Default is the channel’s native data count.
- low, high, tofloat, optional
deprecated by Channel Access, not yet implemented by caproto
- maskSubscriptionType, optional
Subscribe to selective updates.
- Returns:
- subscriptionSubscription
Examples
Define a subscription.
>>> sub = pv.subscribe()
Add a user callback. The subscription will be transparently activated (i.e. an
EventAddRequest
will be sent) when the first user callback is added.>>> sub.add_callback(my_func)
Multiple callbacks may be added to the same subscription.
>>> sub.add_callback(another_func)
See the docstring for
Subscription
for more.
- time_since_last_heard(timeout=PV_DEFAULT_TIMEOUT)[source]¶
Seconds since last message from the server that provides this channel.
The time is reset to 0 whenever we receive a TCP message related to user activity or a Beacon. Servers are expected to send Beacons at regular intervals. If we do not receive either a Beacon or TCP message, we initiate an Echo over TCP, to which the server is expected to promptly respond.
Therefore, the time reported here should not much exceed
EPICS_CA_CONN_TMO
(default 30 seconds unless overriden by that environment variable) if the server is healthy.If the server fails to send a Beacon on schedule and fails to reply to an Echo, the server is assumed dead. A warning is issued, and all PVs are disconnected to initiate a reconnection attempt.
- Parameters:
- timeoutnumber or None, optional
Seconds to wait before a CaprotoTimeoutError is raised. Default is
PV.timeout
, which falls back toPV.context.timeout
if not set. If None, never timeout.
- class caproto.asyncio.client.Subscription(pv, data_type, data_count, low, high, to, mask)[source]¶
Represents one subscription, specified by a PV and configurational parameters
It may fan out to zero, one, or multiple user-registered callback functions.
This object should never be instantiated directly by user code; rather it should be made by calling the
subscribe()
method on aPV
object.- add_callback(func)[source]¶
Add a callback to receive responses.
- Parameters:
- funccallable
Expected signature:
func(sub, response)
.The signature
func(response)
is also supported for backward-compatibility but will issue warnings. Support will be removed in a future release of caproto.
- Returns:
- tokenint
Integer token that can be passed to
remove_callback()
.
Changed in version 0.5.0: Changed the expected signature of
func
fromfunc(response)
tofunc(sub, response)
.
- async remove_callback(token)[source]¶
Remove callback using token that was returned by
add_callback()
.- Parameters:
- tokeninteger
Token returned by
add_callback()
.
The following are internal components. There API may change in the future.
- class caproto.asyncio.client.VirtualCircuitManager(context, circuit, timeout=2.0)[source]¶
Encapsulates a VirtualCircuit, a TCP socket, and additional state
# This object should never be instantiated directly by user code. It is used # internally by the Context. Its methods may be touched by user code, but # this is rarely necessary. #
- property server_protocol_version¶
- property connected¶
A broadcaster client which can be shared among multiple Contexts
- Parameters:
- registration_retry_timefloat, optional
The time, in seconds, between attempts made to register with the repeater. Default is 10.
Process a command and tranport it over the UDP socket.
Disconnect the broadcaster and stop listening
Register this client with the CA Repeater.
Generate, process, and transport search request(s)
Map each known server address to seconds since its last message.
The time is reset to 0 whenever we receive a TCP message related to user activity or a Beacon. Servers are expected to send Beacons at regular intervals. If we do not receive either a Beacon or TCP message, we initiate an Echo over TCP, to which the server is expected to promptly respond.
Therefore, the time reported here should not much exceed
EPICS_CA_CONN_TMO
(default 30 seconds unless overriden by that environment variable) if the server is healthy.If the server fails to send a Beacon on schedule and fails to reply to an Echo, the server is assumed dead. A warning is issued, and all PVs are disconnected to initiate a reconnection attempt.