caproto.ioc_examples.io_interrupt.IOInterruptIOC

Inheritance diagram of IOInterruptIOC
class caproto.ioc_examples.io_interrupt.IOInterruptIOC(prefix, *, macros=None, parent=None, name=None)[source]

An IOC that updates on keypress events.

IOInterruptIOC pvproperties

Attribute

Suffix

Docs

Type

Notes

Alarm Group

keypress

keypress

Latest keypress

str

Read-only Length(10) Startup

Methods

group_read(instance)

Generic read called for channels without get defined

group_write(instance, value)

Generic write called for channels without put defined

Attributes

default_values

type_map

type_map_read_only

pvproperty methods

keypress.startup(self, instance, async_lib)
Source code: keypress.startup
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@keypress.startup
async def keypress(self, instance, async_lib):
    # This method will be called when the server starts up.
    print('* keypress method called at server startup')
    queue = async_lib.ThreadsafeQueue()

    # Start a separate thread that monitors keyboard input, telling it to
    # put new values into our async-friendly queue
    thread = threading.Thread(
        target=start_io_interrupt_monitor,
        daemon=True,
        kwargs=dict(new_value_callback=queue.put)
    )
    thread.start()

    # Loop and grab items from the queue one at a time
    while True:
        value = await queue.async_get()
        print(f'Saw new value on async side: {value!r}')

        # Propagate the keypress to the EPICS PV, triggering any monitors
        # along the way
        await self.keypress.write(str(value))