caproto.ioc_examples.io_interrupt.IOInterruptIOC

Inheritance diagram of IOInterruptIOC
class caproto.ioc_examples.io_interrupt.IOInterruptIOC(prefix: str, *, macros: Optional[Dict[str, str]] = None, parent: Optional[caproto.server.server.PVGroup] = None, name: Optional[str] = None)[source]

An IOC that updates on keypress events.

IOInterruptIOC pvproperties

Attribute

Suffix

Docs

Type

Notes

Alarm Group

keypress

keypress

Latest keypress

str

Read-only Length(10) Startup

Methods

group_read(instance)

Generic read called for channels without get defined

group_write(instance, value)

Generic write called for channels without put defined

Attributes

default_values

type_map

type_map_read_only

pvproperty methods

keypress.startup(self, instance, async_lib)
Source code: keypress.startup
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
    @keypress.startup
    async def keypress(self, instance, async_lib):
        # This method will be called when the server starts up.
        print('* keypress method called at server startup')
        queue = async_lib.ThreadsafeQueue()

        # Start a separate thread that monitors keyboard input, telling it to
        # put new values into our async-friendly queue
        thread = threading.Thread(
            target=start_io_interrupt_monitor,
            daemon=True,
            kwargs=dict(new_value_callback=queue.put)
        )
        thread.start()

        # Loop and grab items from the queue one at a time
        while True:
            value = await queue.async_get()
            print(f'Saw new value on async side: {value!r}')

            # Propagate the keypress to the EPICS PV, triggering any monitors
            # along the way
            await self.keypress.write(str(value))