Writing Your Own Channel Access Client¶
Caproto can be used to implement both Channel Access clients and servers. To give a flavor for how the API works, we’ll demonstrate a simple, synchronous client.
Channel Access Basics¶
A Channel Access client reads and writes values to Channels available from servers on its network. It locates these servers using UDP broadcasts. It communicates with an individual server via one or more TCP connections, which it calls Virtual Circuits.
In this example, our client will talk to a example IOC provided by caproto, but this same code could talk to any Channel Access server.
In a separate shell, start one of caproto’s demo IOCs.
$ python3 -m caproto.ioc_examples.random_walk PVs: ['random_walk:dt', 'random_walk:x']
In a second separate shell, start a repeater process. You may see output like this:
$ caproto-repeater [I 18:04:30.686 repeater:84] Repeater is listening on 0.0.0.0:5065
$ caproto-repeater [I 18:04:08.790 repeater:189] Another repeater is already running; exiting.
Either is fine.
Registering with the Repeater¶
To begin, we need a socket configured for UDP broadcasting. Caproto provides a convenient utility for doing this in a way that works on all platforms.
In : import caproto In : udp_sock = caproto.bcast_socket()
A new Channel Access client is required to register itself with a Channel Access Repeater. (What a Repeater is for is not really important to our story here. It’s an independent process that rebroadcasts incoming server heartbeats to all clients on our host. It exists because old systems don’t handle broadcasts properly.) To register, we must send a request to the Repeater and receive a response.
In : bytes_to_send = b'\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' In : udp_sock.sendto(bytes_to_send, ('127.0.0.1', 5065)) Out: 16
In : data, address = udp_sock.recvfrom(1024) In : data Out: b'\x00\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x7f\x00\x00\x01'
Hurray it worked? Unless you can read Channel Access hex codes the way Neo experiences the Matrix, you may want a better way. Caproto provides a higher level of abstraction, Commands, so that we don’t need to work with raw bytes. Let’s try this again using caproto.
Other sans-I/O libraries use the word Event for what we are calling a Command. “Event” is an overloaded term in Channel Access, so we’re going our own way here.
As above, create a fresh UDP socket configured for broadcasting.
In : import caproto In : udp_sock = caproto.bcast_socket()
Instantiate a caproto
Broadcaster and a command to broadcast — a
In : b = caproto.Broadcaster(our_role=caproto.CLIENT) In : command = caproto.RepeaterRegisterRequest('0.0.0.0')
Pass the command to our broadcaster’s
Broadcaster.send() method, which
does two things. It translates command objects into bytes, and it checks
them against the rules of the Channel Access protocol. The rules are encoded in
the Broadcaster’s internal state machine, which tracks the state of both the
client and the server. (It can serve as either.) If you try to send an illegal
command, it will raise
In : bytes_to_send = b.send(command) In : bytes_to_send Out: b'\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Transport those bytes over the wire, using the same
udp_sock we configured
above. A quick comparison will show that these bytes are the same bytes we
spelled out manually before.
In : udp_sock.sendto(bytes_to_send, ('127.0.0.1', 5065)) Out: 16
Why do we need two steps here? Why doesn’t caproto just send the bytes for us? Because it’s designed to support any socket API you might want to use — synchronous (like this example), asynchronous, etc. Caproto does not care how or when you send and receive the bytes. Its job is to make it easier to compose outgoing messages, interpret incoming ones, and verify that the rules of the protocol are obeyed by both peers.
Recall that we are in the process of registering our client with a Channel Access Repeater and that we are expecting a response. As with sending, receiving is a two-step process. First we read bytes from the socket and pass them to the broadcaster.
In : bytes_received, address = udp_sock.recvfrom(1024) In : commands = b.recv(bytes_received, address)
The bytes have been parsed into command objects. Next, check them against the Channel Access protocol.
In : b.process_commands(commands)
When we call
Broadcaster.process_commands(), the Broadcaster does the
same thing is did for
Broadcaster.send() in reverse: if one of the
received commands is illegal, it raises
Searching for a Channel¶
Say we’re looking for a channel (“Process Variable”) with a typically lyrical
EPICS name like
"random_walk:dt". Some server on our
network provides this channel.
The range of IP addresses to search is conventionally controlled by the
EPICS_CA_ADDR_LIST (a space-separated list of IP
provides a convenience function
get_address_list() for parsing these
variables, checking the available network interfaces if necessary, and
returning a list.
In : import caproto In : hosts = caproto.get_address_list() # example: ['172.17.255.255']
We need to broadcast a search request to the servers on our network and receive
a response. (In the event that multiple responses arrive, Channel Access
specifies that all but the first response should be ignored.) We follow the
same pattern as above, still using our broadcaster
b, our socket
udp_sock, and some new caproto commands.
We need to announce which version of the protocol we are using and the name of
the channel we are seraching for. These two commands must be sent in the same
broadcast (UDP datagram), so we pass them to
In : name = "random_walk:dt" In : bytes_to_send = b.send(caproto.VersionRequest(priority=0, version=13), ....: caproto.SearchRequest(name=name, cid=0, version=13)) ....: In : bytes_to_send Out: b'\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x10\x00\x05\x00\r\x00\x00\x00\x00\x00\x00\x00\x00random_walk:dt\x00\x00' In : for host in hosts: ....: udp_sock.sendto(bytes_to_send, (host, 5064)) ....:
Our answer will arrive in a single datagram with multiple commands in it.
In : bytes_received, recv_address = udp_sock.recvfrom(1024) In : commands = b.recv(bytes_received, recv_address) In : version_response, search_response = commands In : version_response Out: VersionResponse(version=13) In : search_response Out: SearchResponse(port=5064, ip='255.255.255.255', cid=0, version=13) In : address = caproto.extract_address(search_response) In : address Out: ('10.1.0.27', 5064)
Now we have the address of a server that has the channel we’re interested in. Next, we’ll set aside the broadcaster and initiate TCP communication with this particular server.
Creating a Channel¶
Create a TCP connection with the server at the
address we found above.
In : import socket In : sock = socket.create_connection(address)
VirtualCircuit plays the same role for a TCP connection as
Broadcaster played for UDP: we’ll use it to interpret
received bytes as commands and to ensure that incoming and outgoing bytes abide
by the protocol.
In : circuit = caproto.VirtualCircuit(our_role=caproto.CLIENT, address=address, ....: priority=0) ....:
We’ll use these convenience functions for what follows.
In : def send(command): ....: "Process a command in the VirtualCircuit and then transmit its bytes." ....: buffers_to_send = circuit.send(command) ....: sock.sendmsg(buffers_to_send) ....:
In : def recv(): ....: "Receive bytes; parse commands; process them in the VirtualCircuit." ....: bytes_received = sock.recv(4096) ....: commands, _ = circuit.recv(bytes_received) ....: for command in commands: ....: circuit.process_command(command) ....: return commands ....:
We initialize the circuit by specifying our protocol version.
In : send(caproto.VersionRequest(priority=0, version=13)) In : recv() Out: deque([VersionResponse(version=13)])
Optionally provide the host name and “client” name, which the server may use to determine our read/write permissions on channels. (There is no authentication in Channel Access; security has to be provided at the network level.)
In : send(caproto.HostNameRequest('localhost')) In : send(caproto.ClientNameRequest('user'))
Finally, create the channel and look at the responses.
In : cid = 1 # a client-specified unique ID for this Channel In : send(caproto.CreateChanRequest(name=name, cid=cid, version=13)) In : commands = recv() In : access_response, create_chan_response = commands In : access_response Out: AccessRightsResponse(cid=1, access_rights=<AccessRights.WRITE|READ: 3>) In : create_chan_response Out: CreateChanResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, cid=1, sid=0)
Success! We now have a connection to the
random_walk:dt channel. Next we’ll
read and write values.
Incidentally, we can reuse this same
sock to connect to
other channels on the same server. In the commands that follow, we’ll use the
cid (specified by our client in
sid (specified by the server in its
CreateChanResponse) to specify
which channel we mean.
In : sid = create_chan_response.sid
In the event of high traffic clogging the network, we can open up multiple
TCP connections to the same server, each with its own VirtualCircuit, and
designate them with different priority (specified in our
VersionRequest). This why we need the concept of a VirtualCircuit:
there can be multiple VirtualCircuits between peers.
Reading and Writing Values¶
In : send(caproto.ReadNotifyRequest(data_type=create_chan_response.data_type, ....: data_count=create_chan_response.data_count, ....: sid=sid, ....: ioid=1)) ....: In : recv() Out: deque([ReadNotifyResponse(data=array([3.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=1, metadata=None)])
We may request a particular data type and element count; in the case we just
asked for the “native” data type and count that the server reported in its
In : send(caproto.WriteNotifyRequest(data=(4,), ....: data_type=create_chan_response.data_type, ....: data_count=create_chan_response.data_count, ....: sid=sid, ....: ioid=2)) ....: In : recv() Out: deque([WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=2)])
data may be given as one of the following types:
numpy.ndarray(if numpy is installed)
array.array(the somewhat rarely-used builtin array library)
big-endian bytes-like (
The command also accepts a
metadata parameter for data types that include
metadata. See Payload Data Types for details.
Subscribing to “Events” (Updates)¶
Ask the server to send responses every time the value of the Channel changes. As with reading, above, we have the option of requesting a specific data type or element count, but we’ll use the “native” parameters.
In : req = caproto.EventAddRequest(data_type=create_chan_response.data_type, ....: data_count=create_chan_response.data_count, ....: sid=sid, ....: subscriptionid=0, ....: low=0, high=0, to=0, mask=1) ....: In : send(req)
The server always sends at least one response with the current value at subscription time.
In : recv() Out: deque([EventAddResponse(data=array([4.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), subscriptionid=0, metadata=None)])
If the value changes, additional responses will come in. If multiple
subscriptions are in play at once over this circuit, we can use the
subscriptionid to match them to the right channel. We also use it to end
In : send(caproto.EventCancelRequest(data_type=req.data_type, ....: sid=req.sid, ....: subscriptionid=req.subscriptionid)) ....: In : recv() Out: deque([EventAddResponse(data=(repr: tuple index out of range), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=(repr: 0), subscriptionid=0, metadata=(repr: tuple index out of range))])
Closing the Channel¶
To clean up, close the Channel.
In : send(caproto.ClearChannelRequest(sid, cid)) In : recv() Out: deque([ClearChannelResponse(sid=0, cid=1)])
If we are done with the circuit, close the socket too.
In : sock.close()
Simplify Bookkeeping with Channels¶
In the example above, we handled a
VirtualCircuit and several
different commands. The
VirtualCircuit policed our adherence to the
Channel Access protocol by watching incoming and outgoing commands and tracking
the state of the circuit itself and the state(s) of the channel(s) on the
circuit. Internally, to facilitate this, it creates a
object for each channel to encapsulate its state and stash bookkeeping details
Using these objects directly can help us juggle IDs and generate valid commands more succinctly. This API is purely optional, and using it does not affect the state machines.
See how much more succinct our example becomes:
### Create chan = caproto.ClientChannel(name, circuit) send(chan.version()) recv() send(chan.host_name('localhost'), chan.client_name('user'), chan.create()) recv() ### Read and Write send(chan.read()) recv() send(chan.write((4,))) recv() ### Subscribe and Unsubscribe send(chan.subscribe()) recv() send(chan.unsubscribe(0)) recv() ### Clear send(chan.clear()) recv()
Here is the equivalent, a condensed copy of our work from previous sections:
### Create send(caproto.VersionRequest(priority=0, version=13)) recv() send(caproto.HostNameRequest('localhost')) send(caproto.ClientNameRequest('user')) cid = 1 # a client-specified unique ID for this Channel send(caproto.CreateChanRequest(name=name, cid=cid, version=13)) commands = recv() access_response, create_chan_response = commands ### Read and Write send(caproto.ReadNotifyRequest(data_type=2, data_count=1, sid=sid, ioid=1)) recv() send(caproto.WriteNotifyRequest(data=(4,), data_type=2, data_count=1, sid=sid, ioid=2)) recv() ### Subscribe and Unsubscribe req = caproto.EventAddRequest(data_type=create_chan_response.data_type, data_count=create_chan_response.data_count, sid=sid, subscriptionid=0, low=0, high=0, to=0, mask=1) send(req) recv() send(caproto.EventCancelRequest(data_type=req.data_type, sid=req.sid, subscriptionid=req.subscriptionid)) recv() ### Clear send(caproto.ClearChannelRequest(sid, cid)) recv()
Notice that the channel convenience methods like
actually do anything. We still have to
send the command into the
VirtualCircuit and then send it over the socket. These are just easy ways to
generate valid commands — with auto-generated unique IDs filled in — which
you may or may not then choose to send. The state machines are not updated
until (unless) the command is actually sent.