Threading Client

The threading client is a high-performance client that uses Python’s built-in threading module to manage concurrency.

Tutorial

In a separate shell, start one of caproto’s demo IOCs.

$ python3 -m caproto.ioc_examples.random_walk
PVs: ['random_walk:dt', 'random_walk:x']

Connect

Now, in Python we will talk to it using caproto’s threading client. Start by creating a Context.

In [1]: from caproto.threading.client import Context

In [2]: ctx = Context()

The Context object caches connections, manages automatic re-connection, and tracks the state of connections in progress. We can use it to request new connections. Formulating requests for many PVs in a large batch is efficient. In this example we’ll just ask for two PVs.

In [3]: x, dt = ctx.get_pvs('random_walk:x', 'random_walk:dt')

Context.get_pvs() accepts an arbitrary number of PV names and immediately returns a collection of PV objects representing each name. In a background thread, the Context searches for an EPICS server that provides that PV name and then connects to it. The PV object displays its connection state:

In [4]: dt
Out[4]: <PV name='random_walk:dt' priority=0 (searching....)>

The Context displays aggregate information about the state of all connections.

In [5]: ctx
Out[5]: <Context searches_pending=0 circuits=0 pvs=2 idle=0>

Read

Now, to read a PV:

In [6]: res = dt.read()

In [7]: res
Out[7]: ReadNotifyResponse(data=array([3.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=0, metadata=None)

This object is a human-friendly representation of the server’s response. The raw bytes of that response are:

In [8]: bytes(res)
Out[8]: b'\x00\x0f\x00\x08\x00\x06\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00@\x08\x00\x00\x00\x00\x00\x00'

Access particular fields in the response using attribute (“dot”) access on res.

In [9]: res.data
Out[9]: array([3.])

By default, the client does not request any metadata

In [10]: res.metadata

Use the data_type parameter to request a richer data type.

In [11]: richer_res = dt.read(data_type='time')

In [12]: richer_res.metadata
Out[12]: DBR_TIME_DOUBLE(status=<AlarmStatus.NO_ALARM: 0>, severity=<AlarmSeverity.NO_ALARM: 0>, timestamp=1699987532.745315)

In [13]: richer_res.metadata.timestamp
Out[13]: 1699987532.745315

In [14]: richer_res.metadata.stamp.as_datetime()  # a convenience method
Out[14]: datetime.datetime(2023, 11, 14, 18, 45, 32, 745315)

See PV.read() for more information on the values accepted by the data_type parameter.

Note

Performance Note

The underlying metadata and data are stored in efficient, contiguous-memory data structures.

In [15]: res.header  # a ctypes.BigEndianStructure
Out[15]: MessageHeader(command=15, payload_size=8, data_type=6, data_count=1, parameter1=1, parameter2=0)

In [16]: res.buffers  # a collection of one or more buffers
Out[16]: (b'', <memory at 0x7f00a878aa00>)

They were received directly from the socket into these structure with no intermediate copies. Accessing the res.data — which returns a numpy.ndarray or array.array — provides a view onto that same memory with no copying (if the data was received from the socket all at once) or one copy (if the data bridged multiple receipts).

Write

Let us set the value to 1.

In [17]: dt.write([1])
Out[17]: WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=2)

By default, we send WriteNotifyResponse, wait for a response, and return it. There are a couple other ways we can handle writes:

  • Return immediately, not asking for or waiting for a response.

    dt.write([1], wait=False)
    
  • Return immediately, not waiting for a response, but handing the response (when it arrived) to some callback function, processed on a background thread.

    def f(response):
        print('got a response:', response)
    
    dt.write([1], wait=False, callback=f)
    

See the PV.write() for more.

Subscribe (“Monitor”)

Let us now monitor a channel. The server updates the random_walk:x channel periodically at some period set by random_walk:dt. We can subscribe to updates and fan them out to one or more user-defined callback functions. First, we define a Subscription.

In [18]: sub = x.subscribe()

Next, we define a callback function, a function that will be called whenever the server sends an update. It must accept two positional arguments.

In [19]: responses = []

In [20]: def f(sub, response):
   ....:     print('Received response from', sub.pv.name)
   ....:     responses.append(response)
   ....: 

This user-defined function f has access to the full response from the server, which includes data and any metadata. The server’s response does not include the name of the PV involved (it identifies it indirectly via a “subscription ID” code) so caproto provides the function with sub as well, from which you can obtain the pertinent PV sub.pv and its name sub.pv.name as illustrated above. This is useful for distinguishing responses when the same function is added to multiple subscriptions.

Changed in version 0.5.0: The expected signature of the callback function was changed from f(response) to f(sub, response). For backward compatibility, functions with signature f(response) are still accepted, but caproto will issue a warning that a future release may require the new signature, f(sub, response).

We register this function with sub.

In [21]: token = sub.add_callback(f)

The token is just an integer which we can use to remove f later. We can define a second callback:

In [22]: values = []

In [23]: def g(sub, response):
   ....:     values.append(response.data[0])
   ....: 

and add it to the same subscription, putting no additional load on the network.

In [24]: sub.add_callback(g)
Out[24]: 1

After some time has passed, we will have accumulated some responses.

In [25]: len(responses)
Out[25]: 4

In [26]: values
Out[26]: 
[-0.3034241175449923,
 -0.17230503027382338,
 0.5527813185006676,
 1.0863895161795538]

At any point we can remove a specific callback function:

In [27]: sub.remove_callback(token)

or clear all the callbacks on a subscription:

In [28]: sub.clear()

In order to minimize load on the network, a Subscription waits to request updates from the server until the first user callback is added. Thus, the first callback added by the user is guaranteed to get the first response received from the server. If all user callbacks are later removed, either explicitly (via remove_callback or clear) or implicitly via Python garbage collection, the Subscription automatically cancels future updates from the server. If a callback is then later added, the Subscription silently re-initiates updates. All of this is transparent to the user.

Warning

The callback registry in Subscription only holds weak references to the user callback functions. If there are no other references to the function, it will be silently garbage collected and removed. Therefore, constructions like this do not work:

sub.add_callback(lambda sub, response: print(response.data))

The lambda function will be promptly garbage collected by Python and removed from sub by caproto. To avoid that, make a reference before passing the function to Subscription.add_callback().

cb = lambda sub, response: print(response.data)
sub.add_callback(cb)

This can be surprising, but it is a standard approach for avoiding the accidental costly accumulation of abandoned callbacks.

This pitfall does not apply to callbacks passed to PV.read() and PV.write() (or Batch.read() and Batch.write()) because those are single-shot callbacks that do not persist beyond their first use.

Batched Requests

Batching requests is efficient, and it sets up the server to perform these operations as closely-spaced in time as possible, within the limits of the protocol. Python’s with syntax provides a natural way to specify a batch of requests — reads, writes, or a mixture of both — and execute them upon exit from the with block.

Suppose we had a list of PV objects, pvs, and we want to request readings in bulk. We can use a callback function to stash readings in a dictionary as they arrive.

For convenience we’ll demonstrate this using the two PVs we have handy — x and dt — but an unlimited number may be used.

In [29]: pvs = [x, dt]

In [30]: from functools import partial

In [31]: results = {}

In [32]: def stash_result(name, response):
   ....:     results[name] = response.data
   ....: 

Now we’ll use the Batch context:

In [33]: from caproto.threading.client import Batch

In [34]: with Batch() as b:
   ....:     for pv in pvs:
   ....:         b.read(pv, partial(stash_result, pv.name))
   ....: 

The requests will be sent in large batches (over the PVs’ respective circuits) upon exiting the with block. The responses will be processed on a background thread. As the responses come in, the results dictionary will be updated.

In [35]: results
Out[35]: {'random_walk:x': array([1.08638952]), 'random_walk:dt': array([1.])}

See Batch for more.

Go Idle

Once created, PVs are cached for the lifetime of the Context and returned again to the user if a PV with the same name and priority is requested. In order to reduce the load on the network, a PV can be temporarily made “idle” (disconnected). It will silently, automatically reconnect the next time it is used.

In [36]: x
Out[36]: <PV name='random_walk:x' priority=0 address=('10.1.0.30', 5064), circuit_state=States.CONNECTED, channel_state=States.CONNECTED>

In [37]: x.go_idle()

In [38]: x
Out[38]: <PV name='random_walk:x' priority=0 (idle)>

In [39]: x.read()
Out[39]: ReadNotifyResponse(data=array([1.21712944]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=5, metadata=None)

In [40]: x
Out[40]: <PV name='random_walk:x' priority=0 address=('10.1.0.30', 5064), circuit_state=States.CONNECTED, channel_state=States.CONNECTED>

Notice that when the PV was read it automatically reconnected, requiring no action from the user.

The go_idle() method is merely a request and is not guaranteed to have any effect. If a PV has active subscriptions, it will ignore the request: it must stay active to continue servicing user callbacks. Therefore, it is safe call go_idle() on any PV at any time, knowing that the PV will decline to disconnect if it is being actively used and that, if it is currently unused, it will transparently reconnect the next time it is used.

Canceling Searches

All unanswered searches are retried repeatedly, with decreasing frequency, forever. Each new call to get_pvs() causes all unanswered searches to be retried at least once immediately. This can generate unwanted network traffic. To fully cancel a search that is never expected to complete, access the method SharedBroadcaster.cancel.

ctx.broadcaster.cancel('some typo-ed PV name, for example')

As the name suggests, it is possible to construct multiple Contexts that share one SharedBroadcaster. In that scenario, notice that canceling the search will affect all contexts using the SharedBroadcaster.

Events Off and On

If a given circuit produces updates faster than a client can process them, the client can suspend subscriptions on that circuit. This will causes the server to discard all backlogged updates and all new updates during the period of supsension. When the client reactives subscriptions, it will immediate receive the most recent update and then any future updates.

x.circuit_manager.events_off()
...
x.circuit_manager.events_on()

Server Health Check

To check how much time has passed (in seconds) since each known server was last heard from, use:

ctx.broadcaster.time_since_last_heard()

As a convenience, check on the server connected to a specific PV using:

x.time_since_last_heard()

See the SharedBroadcaster.time_since_last_heard() API documentation below for details.

Logs for Debugging

Caproto uses Python’s logging framework, which enables sophisticated log management. For more information and copy/paste-able examples, see Logging.

API Documentation

class caproto.threading.client.Context(broadcaster=None, *, timeout=2.0, host_name=None, client_name=None, max_workers=1)[source]

Encapsulates the state and connections of a client

Parameters:
broadcasterSharedBroadcaster, optional

If None is specified, a fresh one is instantiated.

timeoutnumber or None, optional

Number of seconds before a CaprotoTimeoutError is raised. This default can be overridden at the PV level or for any given operation. If unset, the default is 2 seconds. If None, never timeout. A global timeout can be specified via an environment variable CAPROTO_DEFAULT_TIMEOUT.

host_namestring, optional

uses value of socket.gethostname() by default

client_namestring, optional

uses value of getpass.getuser() by default

max_workersinteger, optional

Number of worker threaders per VirtualCircuit for executing user callbacks. Default is 1. For any number of workers, workers will receive updates in the order which they are received from the server. That is, work on each update will begin in sequential order. Work-scheduling internal to the user callback is outside caproto’s control. If the number of workers is set to greater than 1, the work on each update may not finish in a deterministic order. For example, if workers are writing lines into a file, the only way to guarantee that the lines are ordered properly is to use only one worker. If ordering matters for your application, think carefully before increasing this value from 1.

get_pvs(*names, priority=0, connection_state_callback=None, access_rights_callback=None, timeout=CONTEXT_DEFAULT_TIMEOUT)[source]

Return a list of PV objects.

These objects may not be connected at first. Channel creation occurs on a background thread.

PVs are uniquely defined by their name and priority. If a PV with the same name and priority is requested twice, the same (cached) object is returned. Any callbacks included here are added to added alongside any existing ones.

Parameters:
*namesstrings

any number of PV names

priorityinteger

Used by the server to triage subscription responses when under high load. 0 is lowest; 99 is highest.

connection_state_callbackcallable

Expected signature: f(pv, state) where pv is the instance of PV whose state has changed and state is a string

access_rights_callbackcallable

Expected signature: f(pv, access_rights) where pv is the instance of PV whose state has changed and access_rights is a member of the caproto AccessRights enum

timeoutnumber or None, optional

Number of seconds before a CaprotoTimeoutError is raised. This default can be overridden for any specific operation. By default, fall back to the default timeout set by the Context. If None, never timeout.

class caproto.threading.client.PV(name, priority, context, timeout)[source]

These must be instantiated by a Context, never directly.

name
priority
context
access_rights
log
component_lock
circuit_ready
channel_ready
connection_state_callback
access_rights_callback
subscriptions
property timeout

Effective default timeout.

Valid values are: * CONTEXT_DEFAULT_TIMEOUT (fall back to Context.timeout) * a floating-point number * None (never timeout)

property circuit_manager
property channel
access_rights_changed(rights)[source]
connection_state_changed(state, channel)[source]
property connected

Wait for this PV to be found.

This does not wait for the PV’s Channel to be created; it merely waits for an address (and a VirtualCircuit) to be assigned.

Parameters:
timeoutnumber or None, optional

Seconds to wait before a CaprotoTimeoutError is raised. Default is PV.timeout, which falls back to Context.timeout if not set. If None, never timeout.

wait_for_connection(*, timeout=PV_DEFAULT_TIMEOUT)[source]

Wait for this PV to be connected.

Parameters:
timeoutnumber or None, optional

Seconds to wait before a CaprotoTimeoutError is raised. Default is PV.timeout, which falls back to PV.context.timeout if not set. If None, never timeout.

go_idle()[source]

Request to clear this Channel to reduce load on client and server.

A new Channel will be automatically, silently created the next time any method requiring a connection is called. Thus, this saves some memory in exchange for making the next request a bit slower, as it has to redo the handshake with the server first.

If there are any subscriptions with callbacks, this request will be ignored. If the PV is in the process of connecting, this request will be ignored. If there are any actions in progress (read, write) this request will be processed when they are complete.

read(*, wait=True, callback=None, timeout=PV_DEFAULT_TIMEOUT, data_type=None, data_count=None, notify=True)[source]

Request a fresh reading.

Can do one or both of: - Block while waiting for the response, and return it. - Pass the response to callback, with or without blocking.

Parameters:
waitboolean

If True (default) block until a matching response is received from the server. Raises CaprotoTimeoutError if that response is not received within the time specified by the timeout parameter.

callbackcallable or None

Called with the response as its argument when received.

timeoutnumber or None, optional

Seconds to wait before a CaprotoTimeoutError is raised. Default is PV.timeout, which falls back to PV.context.timeout if not set. If None, never timeout.

data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.

data_countinteger, optional

Requested number of values. Default is the channel’s native data count.

notify: boolean, optional

Send a ReadNotifyRequest instead of a ReadRequest. True by default.

write(data, *, wait=True, callback=None, timeout=PV_DEFAULT_TIMEOUT, notify=None, data_type=None, data_count=None)[source]

Write a new value. Optionally, request confirmation from the server.

Can do one or both of: - Block while waiting for the response, and return it. - Pass the response to callback, with or without blocking.

Parameters:
datastr, int, or float or any Iterable of these

Value(s) to write.

waitboolean

If True (default) block until a matching WriteNotifyResponse is received from the server. Raises CaprotoTimeoutError if that response is not received within the time specified by the timeout parameter.

callbackcallable or None

Called with the WriteNotifyResponse as its argument when received.

timeoutnumber or None, optional

Seconds to wait before a CaprotoTimeoutError is raised. Default is PV.timeout, which falls back to PV.context.timeout if not set. If None, never timeout.

notifyboolean or None

If None (default), set to True if wait=True or callback is set. Can be manually set to True or False. Will raise ValueError if set to False while wait=True or callback is set.

data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Write specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.

data_countinteger, optional

Requested number of values. Default is the channel’s native data count.

subscribe(data_type=None, data_count=None, low=0.0, high=0.0, to=0.0, mask=None)[source]

Start a new subscription to which user callback may be added.

Parameters:
data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.

data_countinteger, optional

Requested number of values. Default is the channel’s native data count.

low, high, tofloat, optional

deprecated by Channel Access, not yet implemented by caproto

maskSubscriptionType, optional

Subscribe to selective updates.

Returns:
subscriptionSubscription

Examples

Define a subscription.

>>> sub = pv.subscribe()

Add a user callback. The subscription will be transparently activated (i.e. an EventAddRequest will be sent) when the first user callback is added.

>>> sub.add_callback(my_func)

Multiple callbacks may be added to the same subscription.

>>> sub.add_callback(another_func)

See the docstring for Subscription for more.

unsubscribe_all()[source]

Clear all subscriptions. (Remove all user callbacks from them.)

time_since_last_heard(timeout=PV_DEFAULT_TIMEOUT)[source]

Seconds since last message from the server that provides this channel.

The time is reset to 0 whenever we receive a TCP message related to user activity or a Beacon. Servers are expected to send Beacons at regular intervals. If we do not receive either a Beacon or TCP message, we initiate an Echo over TCP, to which the server is expected to promptly respond.

Therefore, the time reported here should not much exceed EPICS_CA_CONN_TMO (default 30 seconds unless overriden by that environment variable) if the server is healthy.

If the server fails to send a Beacon on schedule and fails to reply to an Echo, the server is assumed dead. A warning is issued, and all PVs are disconnected to initiate a reconnection attempt.

Parameters:
timeoutnumber or None, optional

Seconds to wait before a CaprotoTimeoutError is raised. Default is PV.timeout, which falls back to PV.context.timeout if not set. If None, never timeout.

command_bundle_queue
class caproto.threading.client.Subscription(pv, data_type, data_count, low, high, to, mask)[source]

Represents one subscription, specified by a PV and configurational parameters

It may fan out to zero, one, or multiple user-registered callback functions.

This object should never be instantiated directly by user code; rather it should be made by calling the subscribe() method on a PV object.

add_callback(func)[source]

Add a callback to receive responses.

Parameters:
funccallable

Expected signature: func(sub, response).

The signature func(response) is also supported for backward-compatibility but will issue warnings. Support will be removed in a future release of caproto.

Returns:
tokenint

Integer token that can be passed to remove_callback().

Changed in version 0.5.0: Changed the expected signature of func from func(response) to func(sub, response).

clear()[source]

Remove all callbacks.

remove_callback(token)[source]

Remove callback using token that was returned by add_callback().

Parameters:
tokeninteger

Token returned by add_callback().

class caproto.threading.client.Batch(timeout=2)[source]

Accumulate requests and then issue them all in batch.

Parameters:
timeoutnumber or None

Number of seconds to wait before ignoring late responses. Default is 2.

Examples

Read some PVs in batch and stash the readings in a dictionary as they come in.

>>> results = {}
>>> def stash_result(name, response):
...     results[name] = response.data
...
>>> with Batch() as b:
...     for pv in pvs:
...         b.read(pv, functools.partial(stash_result, pv.name))
...     # The requests are sent upon exiting this 'with' block.
...

The results dictionary will be populated as responses come in.

read(pv, callback, data_type=None, data_count=None)[source]

Request a fresh reading as part of a batched request.

Notice that, unlike PV.read(), the callback is required. (There is no other way to get the result back from a batched read.)

Parameters:
pvPV
callbackcallable

Expected signature: f(response)

data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.

data_countinteger, optional

Requested number of values. Default is the channel’s native data count.

write(pv, data, callback=None, data_type=None, data_count=None)[source]

Write a new value as part of a batched request.

Parameters:
pvPV
datastr, int, or float or any Iterable of these

Value(s) to write.

callbackcallable

Expected signature: f(response)

data_type{‘native’, ‘status’, ‘time’, ‘graphic’, ‘control’} or ChannelType or int ID, optional

Request specific data type or a class of data types, matched to the channel’s native data type. Default is Channel’s native data type.

data_countinteger, optional

Requested number of values. Default is the channel’s native data count.

The following are internal components. There API may change in the future.

class caproto.threading.client.VirtualCircuitManager(context, circuit, selector, timeout=2.0)[source]

Encapsulates a VirtualCircuit, a TCP socket, and additional state

This object should never be instantiated directly by user code. It is used internally by the Context. Its methods may be touched by user code, but this is rarely necessary.

context
circuit
log
channels
pvs
ioids
subscriptions
selector
user_callback_executor
last_tcp_receipt
all_created_pvnames
dead
socket
property connected
send(*commands, extra=None)[source]
received(bytes_recv, address)[source]

Receive and process and next command from the virtual circuit.

This will be run on the recv thread

events_off()[source]

Suspend updates to all subscriptions on this circuit.

This may be useful if the server produces updates faster than the client can processs them.

events_on()[source]

Reactive updates to all subscriptions on this circuit.

disconnect()[source]
process_queue
processing
class caproto.threading.client.SharedBroadcaster(*, registration_retry_time=10.0)[source]

A broadcaster client which can be shared among multiple Contexts

Parameters:
registration_retry_timefloat, optional

The time, in seconds, between attempts made to register with the repeater. Default is 10.

new_id()[source]
add_listener(listener)[source]
remove_listener(listener)[source]
disconnect(*, wait=True)[source]
send(*commands)[source]

Process a command and transport it over the UDP socket.

get_cached_search_result(name, *, threshold=10.0)[source]

Returns address if found, raises KeyError if missing or stale.

search(results_queue, names, *, timeout=2)[source]

Search for PV names.

The results_queue will receive (address, names) (the address of a server and a list of name(s) that it has) when results are received.

If a cached result is already known, it will be put immediately into results_queue from this thread during this method’s execution.

If not, a SearchRequest will be sent from another thread. If necessary, the request will be re-sent periodically. When a matching response is received (by yet another thread) (address, names) will be put into the results_queue.

cancel(*names)[source]

Cancel searches for these names.

Parameters:
*namesstrings

any number of PV names

Any PV instances that were awaiting these results will be stuck until
:meth:`get_pvs` is called again.
search_now()[source]

Force the Broadcaster to reissue all unanswered search requests now.

Left to its own devices, the Broadcaster will do this at regular intervals automatically. This method is intended primarily for debugging and should not be needed in normal use.

received(bytes_recv, address)[source]

Receive and process and next command broadcasted over UDP.

command_loop()[source]
time_since_last_heard()[source]

Map each known server address to seconds since its last message.

The time is reset to 0 whenever we receive a TCP message related to user activity or a Beacon. Servers are expected to send Beacons at regular intervals. If we do not receive either a Beacon or TCP message, we initiate an Echo over TCP, to which the server is expected to promptly respond.

Therefore, the time reported here should not much exceed EPICS_CA_CONN_TMO (default 30 seconds unless overriden by that environment variable) if the server is healthy.

If the server fails to send a Beacon on schedule and fails to reply to an Echo, the server is assumed dead. A warning is issued, and all PVs are disconnected to initiate a reconnection attempt.