Writing Your Own Channel Access Client¶
Caproto can be used to implement both Channel Access clients and servers. To give a flavor for how the API works, we’ll demonstrate a simple, synchronous client.
Channel Access Basics¶
A Channel Access client reads and writes values to Channels available from servers on its network. It locates these servers using UDP broadcasts. It communicates with an individual server via one or more TCP connections, which it calls Virtual Circuits.
In this example, our client will talk to a example IOC provided by caproto, but this same code could talk to any Channel Access server.
In a separate shell, start one of caproto’s demo IOCs.
$ python3 -m caproto.ioc_examples.random_walk
PVs: ['random_walk:dt', 'random_walk:x']
In a second separate shell, start a repeater process. You may see output like this:
$ caproto-repeater
[I 18:04:30.686 repeater:84] Repeater is listening on 0.0.0.0:5065
or this:
$ caproto-repeater
[I 18:04:08.790 repeater:189] Another repeater is already running; exiting.
Either is fine.
Registering with the Repeater¶
To begin, we need a socket configured for UDP broadcasting. Caproto provides a convenient utility for doing this in a way that works on all platforms.
In [1]: import caproto
In [2]: udp_sock = caproto.bcast_socket()
A new Channel Access client is required to register itself with a Channel Access Repeater. (What a Repeater is for is not really important to our story here. It’s an independent process that rebroadcasts incoming server heartbeats to all clients on our host. It exists because old systems don’t handle broadcasts properly.) To register, we must send a request to the Repeater and receive a response.
In [3]: bytes_to_send = b'\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
In [4]: udp_sock.sendto(bytes_to_send, ('127.0.0.1', 5065))
Out[4]: 16
In [5]: data, address = udp_sock.recvfrom(1024)
In [6]: data
Out[6]: b'\x00\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x7f\x00\x00\x01'
Hurray it worked? Unless you can read Channel Access hex codes the way Neo experiences the Matrix, you may want a better way. Caproto provides a higher level of abstraction, Commands, so that we don’t need to work with raw bytes. Let’s try this again using caproto.
Note
Other sans-I/O libraries use the word Event for what we are calling a Command. “Event” is an overloaded term in Channel Access, so we’re going our own way here.
As above, create a fresh UDP socket configured for broadcasting.
In [7]: import caproto
In [8]: udp_sock = caproto.bcast_socket()
Instantiate a caproto Broadcaster
and a command to broadcast — a
RepeaterRegisterRequest
.`
In [9]: b = caproto.Broadcaster(our_role=caproto.CLIENT)
In [10]: command = caproto.RepeaterRegisterRequest('0.0.0.0')
Pass the command to our broadcaster’s Broadcaster.send()
method, which
does two things. It translates command objects into bytes, and it checks
them against the rules of the Channel Access protocol. The rules are encoded in
the Broadcaster’s internal state machine, which tracks the state of both the
client and the server. (It can serve as either.) If you try to send an illegal
command, it will raise LocalProtocolError
.
In [11]: bytes_to_send = b.send(command)
In [12]: bytes_to_send
Out[12]: b'\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Transport those bytes over the wire, using the same udp_sock
we configured
above. A quick comparison will show that these bytes are the same bytes we
spelled out manually before.
In [13]: udp_sock.sendto(bytes_to_send, ('127.0.0.1', 5065))
Out[13]: 16
Why do we need two steps here? Why doesn’t caproto just send the bytes for us? Because it’s designed to support any socket API you might want to use — synchronous (like this example), asynchronous, etc. Caproto does not care how or when you send and receive the bytes. Its job is to make it easier to compose outgoing messages, interpret incoming ones, and verify that the rules of the protocol are obeyed by both peers.
Recall that we are in the process of registering our client with a Channel Access Repeater and that we are expecting a response. As with sending, receiving is a two-step process. First we read bytes from the socket and pass them to the broadcaster.
In [14]: bytes_received, address = udp_sock.recvfrom(1024)
In [15]: commands = b.recv(bytes_received, address)
The bytes have been parsed into command objects. Next, check them against the Channel Access protocol.
In [16]: b.process_commands(commands)
When we call Broadcaster.process_commands()
, the Broadcaster does the
same thing is did for Broadcaster.send()
in reverse: if one of the
received commands is illegal, it raises RemoteProtocolError
.
Searching for a Channel¶
Say we’re looking for a channel (“Process Variable”) with a typically lyrical
EPICS name like "random_walk:dt"
. Some server on our
network provides this channel.
The range of IP addresses to search is conventionally controlled by the
environment variables EPICS_CA_ADDR_LIST
(a space-separated list of IP
addresses) and EPICS_CA_AUTO_ADDR_LIST
(yes
or no
). Caproto
provides a convenience function get_address_list()
for parsing these
variables, checking the available network interfaces if necessary, and
returning a list.
In [17]: import caproto
In [18]: hosts = caproto.get_address_list() # example: ['172.17.255.255']
We need to broadcast a search request to the servers on our network and receive
a response. (In the event that multiple responses arrive, Channel Access
specifies that all but the first response should be ignored.) We follow the
same pattern as above, still using our broadcaster b
, our socket
udp_sock
, and some new caproto commands.
We need to announce which version of the protocol we are using and the name of
the channel we are seraching for. These two commands must be sent in the same
broadcast (UDP datagram), so we pass them to Broadcaster.send()
together.
In [19]: name = "random_walk:dt"
In [20]: bytes_to_send = b.send(caproto.VersionRequest(priority=0, version=13),
....: caproto.SearchRequest(name=name, cid=0, version=13))
....:
In [21]: bytes_to_send
Out[21]: b'\x00\x00\x00\x00\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x10\x00\x05\x00\r\x00\x00\x00\x00\x00\x00\x00\x00random_walk:dt\x00\x00'
In [22]: for host in hosts:
....: udp_sock.sendto(bytes_to_send, (host, 5064))
....:
Our answer will arrive in a single datagram with multiple commands in it.
In [23]: bytes_received, recv_address = udp_sock.recvfrom(1024)
In [24]: commands = b.recv(bytes_received, recv_address)
In [25]: version_response, search_response = commands
In [26]: version_response
Out[26]: VersionResponse(version=13)
In [27]: search_response
Out[27]: SearchResponse(port=5064, ip='255.255.255.255', cid=0, version=13)
In [28]: address = caproto.extract_address(search_response)
In [29]: address
Out[29]: ('10.1.0.22', 5064)
Now we have the address of a server that has the channel we’re interested in. Next, we’ll set aside the broadcaster and initiate TCP communication with this particular server.
Creating a Channel¶
Create a TCP connection with the server at the address
we found above.
In [30]: import socket
In [31]: sock = socket.create_connection(address)
A VirtualCircuit
plays the same role for a TCP connection as
the Broadcaster
played for UDP: we’ll use it to interpret
received bytes as commands and to ensure that incoming and outgoing bytes abide
by the protocol.
In [32]: circuit = caproto.VirtualCircuit(our_role=caproto.CLIENT, address=address,
....: priority=0)
....:
We’ll use these convenience functions for what follows.
In [33]: def send(command):
....: "Process a command in the VirtualCircuit and then transmit its bytes."
....: buffers_to_send = circuit.send(command)
....: sock.sendmsg(buffers_to_send)
....:
In [34]: def recv():
....: "Receive bytes; parse commands; process them in the VirtualCircuit."
....: bytes_received = sock.recv(4096)
....: commands, _ = circuit.recv(bytes_received)
....: for command in commands:
....: circuit.process_command(command)
....: return commands
....:
We initialize the circuit by specifying our protocol version.
In [35]: send(caproto.VersionRequest(priority=0, version=13))
In [36]: recv()
Out[36]: deque([VersionResponse(version=13)])
Optionally provide the host name and “client” name, which the server may use to determine our read/write permissions on channels. (There is no authentication in Channel Access; security has to be provided at the network level.)
In [37]: send(caproto.HostNameRequest('localhost'))
In [38]: send(caproto.ClientNameRequest('user'))
Finally, create the channel and look at the responses.
In [39]: cid = 1 # a client-specified unique ID for this Channel
In [40]: send(caproto.CreateChanRequest(name=name, cid=cid, version=13))
In [41]: commands = recv()
In [42]: access_response, create_chan_response = commands
In [43]: access_response
Out[43]: AccessRightsResponse(cid=1, access_rights=<AccessRights.WRITE|READ: 3>)
In [44]: create_chan_response
Out[44]: CreateChanResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, cid=1, sid=0)
Success! We now have a connection to the random_walk:dt
channel. Next we’ll
read and write values.
Incidentally, we can reuse this same circuit
and sock
to connect to
other channels on the same server. In the commands that follow, we’ll use the
integer IDs cid
(specified by our client in CreateChanRequest
) and
sid
(specified by the server in its CreateChanResponse
) to specify
which channel we mean.
In [45]: sid = create_chan_response.sid
In the event of high traffic clogging the network, we can open up multiple
TCP connections to the same server, each with its own VirtualCircuit, and
designate them with different priority (specified in our
VersionRequest
). This why we need the concept of a VirtualCircuit:
there can be multiple VirtualCircuits between peers.
Reading and Writing Values¶
Read:
In [46]: send(caproto.ReadNotifyRequest(data_type=create_chan_response.data_type,
....: data_count=create_chan_response.data_count,
....: sid=sid,
....: ioid=1))
....:
In [47]: recv()
Out[47]: deque([ReadNotifyResponse(data=array([3.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=1, metadata=None)])
We may request a particular data type and element count; in the case we just
asked for the “native” data type and count that the server reported in its
CreateChanResponse
above.
Write:
In [48]: send(caproto.WriteNotifyRequest(data=(4,),
....: data_type=create_chan_response.data_type,
....: data_count=create_chan_response.data_count,
....: sid=sid,
....: ioid=2))
....:
In [49]: recv()
Out[49]: deque([WriteNotifyResponse(data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), ioid=2)])
The data
may be given as one of the following types:
tuple
numpy.ndarray
(if numpy is installed)big-endian
array.array
(the somewhat rarely-used builtin array library)big-endian bytes-like (
bytes
,bytearray
,memoryview
)
The command also accepts a metadata
parameter for data types that include
metadata. See Payload Data Types for details.
Subscribing to “Events” (Updates)¶
Ask the server to send responses every time the value of the Channel changes. As with reading, above, we have the option of requesting a specific data type or element count, but we’ll use the “native” parameters.
In [50]: req = caproto.EventAddRequest(data_type=create_chan_response.data_type,
....: data_count=create_chan_response.data_count,
....: sid=sid,
....: subscriptionid=0,
....: low=0, high=0, to=0, mask=1)
....:
In [51]: send(req)
The server always sends at least one response with the current value at subscription time.
In [52]: recv()
Out[52]: deque([EventAddResponse(data=array([4.]), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=CAStatusCode(name='ECA_NORMAL', code=0, code_with_severity=1, severity=<CASeverity.SUCCESS: 1>, success=1, defunct=False, description='Normal successful completion'), subscriptionid=0, metadata=None)])
If the value changes, additional responses will come in. If multiple
subscriptions are in play at once over this circuit, we can use the
subscriptionid
to match them to the right channel. We also use it to end
the subscription:
In [53]: send(caproto.EventCancelRequest(data_type=req.data_type,
....: sid=req.sid,
....: subscriptionid=req.subscriptionid))
....:
In [54]: recv()
Out[54]: deque([EventAddResponse(data=(repr: tuple index out of range), data_type=<ChannelType.DOUBLE: 6>, data_count=1, status=(repr: 0), subscriptionid=0, metadata=(repr: tuple index out of range))])
Closing the Channel¶
To clean up, close the Channel.
In [55]: send(caproto.ClearChannelRequest(sid, cid))
In [56]: recv()
Out[56]: deque([ClearChannelResponse(sid=0, cid=1)])
If we are done with the circuit, close the socket too.
In [57]: sock.close()
Simplify Bookkeeping with Channels¶
In the example above, we handled a VirtualCircuit
and several
different commands. The VirtualCircuit
policed our adherence to the
Channel Access protocol by watching incoming and outgoing commands and tracking
the state of the circuit itself and the state(s) of the channel(s) on the
circuit. Internally, to facilitate this, it creates a ClientChannel
object for each channel to encapsulate its state and stash bookkeeping details
like cid
and sid
.
Using these objects directly can help us juggle IDs and generate valid commands more succinctly. This API is purely optional, and using it does not affect the state machines.
See how much more succinct our example becomes:
### Create
chan = caproto.ClientChannel(name, circuit)
send(chan.version())
recv()
send(chan.host_name('localhost'), chan.client_name('user'), chan.create())
recv()
### Read and Write
send(chan.read())
recv()
send(chan.write((4,)))
recv()
### Subscribe and Unsubscribe
send(chan.subscribe())
recv()
send(chan.unsubscribe(0))
recv()
### Clear
send(chan.clear())
recv()
Here is the equivalent, a condensed copy of our work from previous sections:
### Create
send(caproto.VersionRequest(priority=0, version=13))
recv()
send(caproto.HostNameRequest('localhost'))
send(caproto.ClientNameRequest('user'))
cid = 1 # a client-specified unique ID for this Channel
send(caproto.CreateChanRequest(name=name, cid=cid, version=13))
commands = recv()
access_response, create_chan_response = commands
### Read and Write
send(caproto.ReadNotifyRequest(data_type=2, data_count=1, sid=sid, ioid=1))
recv()
send(caproto.WriteNotifyRequest(data=(4,), data_type=2, data_count=1, sid=sid, ioid=2))
recv()
### Subscribe and Unsubscribe
req = caproto.EventAddRequest(data_type=create_chan_response.data_type,
data_count=create_chan_response.data_count,
sid=sid,
subscriptionid=0,
low=0, high=0, to=0, mask=1)
send(req)
recv()
send(caproto.EventCancelRequest(data_type=req.data_type,
sid=req.sid,
subscriptionid=req.subscriptionid))
recv()
### Clear
send(caproto.ClearChannelRequest(sid, cid))
recv()
Notice that the channel convenience methods like chan.create()
don’t
actually do anything. We still have to send
the command into the
VirtualCircuit and then send it over the socket. These are just easy ways to
generate valid commands — with auto-generated unique IDs filled in — which
you may or may not then choose to send. The state machines are not updated
until (unless) the command is actually sent.