caproto.pva.ioc_examples.group.MyIOC

Inheritance diagram of MyIOC
class caproto.pva.ioc_examples.group.MyIOC(prefix, *, macros=None, parent=None, name=None)[source]
MyIOC pvproperties

Attribute

Suffix

Docs

Type

Notes

Alarm Group

long_write

long_write

MyData

Put

rpc

rpc

MyData

test

test

MyData

Startup Shutdown Put

test2

test2

MyData

Startup

test3

test3

type

Methods

group_read(instance, request)

Generic read called for channels without get defined

group_write(instance, update)

Generic write called for channels without put defined

Attributes

array_type_map

type_map

pvproperty methods

rpc.call(self, instance, value)
Source code: rpc.rpc
69@rpc.call
70async def rpc(self, instance, data):
71    # Some awf... nice normative type stuff comes through here (NTURI):
72    print('RPC call data is', data)
73    print('Scheme:', data.scheme)
74    print('Query:', data.query)
75    print('Path:', data.path)
76
77    # Echo back the query value, if available:
78    query = data.query
79    value = int(getattr(query, 'value', '1234'))
80    return MyData(value=value)

test.startup(self, instance, async_lib)
45@test.startup
46async def test(self, instance, async_lib):
47    self.async_lib = async_lib
48
49    while True:
50        async with self.test as test:
51            test.value = test.value + 1
52            test.info = f'testing {test.value}'
53
54        await async_lib.library.sleep(0.5)
test.shutdown(self, instance, async_lib)
56@test.shutdown
57async def test(self, instance, async_lib):
58    print('shutdown')
test2.startup(self, instance, async_lib)
60@test2.startup
61async def test2(self, instance, async_lib):
62    while True:
63        async with self.test2 as test2, self.test3 as test3:
64            # Swap values
65            test2.value, test3.value = int(test3.value), float(test2.value)
66
67        await async_lib.library.sleep(2.0)