caproto.pva.ioc_examples.group.MyIOC¶
- class caproto.pva.ioc_examples.group.MyIOC(prefix, *, macros=None, parent=None, name=None)[source]¶
¶ Attribute
Suffix
Docs
Type
Notes
Alarm Group
long_write
long_write
MyData
Put
rpc
rpc
MyData
test
test
MyData
Startup Shutdown Put
test2
test2
MyData
Startup
test3
test3
type
Methods
group_read
(instance, request)Generic read called for channels without get defined
group_write
(instance, update)Generic write called for channels without put defined
Attributes
array_type_map
type_map
pvproperty methods
- rpc.call(self, instance, value)¶
Source code: rpc.rpc
69@rpc.call 70async def rpc(self, instance, data): 71 # Some awf... nice normative type stuff comes through here (NTURI): 72 print('RPC call data is', data) 73 print('Scheme:', data.scheme) 74 print('Query:', data.query) 75 print('Path:', data.path) 76 77 # Echo back the query value, if available: 78 query = data.query 79 value = int(getattr(query, 'value', '1234')) 80 return MyData(value=value)
- test.startup(self, instance, async_lib)¶
45@test.startup 46async def test(self, instance, async_lib): 47 self.async_lib = async_lib 48 49 while True: 50 async with self.test as test: 51 test.value = test.value + 1 52 test.info = f'testing {test.value}' 53 54 await async_lib.library.sleep(0.5)
- test.shutdown(self, instance, async_lib)¶
56@test.shutdown 57async def test(self, instance, async_lib): 58 print('shutdown')
- test2.startup(self, instance, async_lib)¶
60@test2.startup 61async def test2(self, instance, async_lib): 62 while True: 63 async with self.test2 as test2, self.test3 as test3: 64 # Swap values 65 test2.value, test3.value = int(test3.value), float(test2.value) 66 67 await async_lib.library.sleep(2.0)