caproto.ioc_examples.io_interrupt.IOInterruptIOC¶
-
class
caproto.ioc_examples.io_interrupt.
IOInterruptIOC
(prefix: str, *, macros: Optional[Dict[str, str]] = None, parent: Optional[caproto.server.server.PVGroup] = None, name: Optional[str] = None)[source]¶ An IOC that updates on keypress events.
¶ Attribute
Suffix
Docs
Type
Notes
Alarm Group
keypress
keypress
Latest keypress
str
Read-only Length(10) Startup
Methods
group_read
(instance)Generic read called for channels without get defined
group_write
(instance, value)Generic write called for channels without put defined
Attributes
default_values
type_map
type_map_read_only
pvproperty methods
-
keypress.
startup
(self, instance, async_lib)¶
Source code: keypress.startup
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
@keypress.startup async def keypress(self, instance, async_lib): # This method will be called when the server starts up. print('* keypress method called at server startup') queue = async_lib.ThreadsafeQueue() # Start a separate thread that monitors keyboard input, telling it to # put new values into our async-friendly queue thread = threading.Thread( target=start_io_interrupt_monitor, daemon=True, kwargs=dict(new_value_callback=queue.put) ) thread.start() # Loop and grab items from the queue one at a time while True: value = await queue.async_get() print(f'Saw new value on async side: {value!r}') # Propagate the keypress to the EPICS PV, triggering any monitors # along the way await self.keypress.write(str(value))
-