caproto.ioc_examples.io_interrupt.IOInterruptIOC

Inheritance diagram of IOInterruptIOC
class caproto.ioc_examples.io_interrupt.IOInterruptIOC(prefix: str, *, macros: Dict[str, str] | None = None, parent: PVGroup | None = None, name: str | None = None)[source]

An IOC that updates on keypress events.

IOInterruptIOC pvproperties

Attribute

Suffix

Docs

Type

Notes

Alarm Group

keypress

keypress

Latest keypress

str

Read-only Length(10) Startup

Methods

group_read(instance)

Generic read called for channels without get defined

group_write(instance, value)

Generic write called for channels without put defined

Attributes

default_values

type_map

type_map_read_only

pvdb

attr_pvdb

attr_to_pvname

groups

pvproperty methods

keypress.startup(self, instance, async_lib)
Source code: keypress.startup
 78    @keypress.startup
 79    async def keypress(self, instance, async_lib):
 80        # This method will be called when the server starts up.
 81        print('* keypress method called at server startup')
 82        queue = async_lib.ThreadsafeQueue()
 83
 84        # Start a separate thread that monitors keyboard input, telling it to
 85        # put new values into our async-friendly queue
 86        thread = threading.Thread(
 87            target=start_io_interrupt_monitor,
 88            daemon=True,
 89            kwargs=dict(new_value_callback=queue.put)
 90        )
 91        thread.start()
 92
 93        # Loop and grab items from the queue one at a time
 94        while True:
 95            value = await queue.async_get()
 96            print(f'Saw new value on async side: {value!r}')
 97
 98            # Propagate the keypress to the EPICS PV, triggering any monitors
 99            # along the way
100            await self.keypress.write(str(value))