caproto.ioc_examples.io_interrupt.IOInterruptIOC

class caproto.ioc_examples.io_interrupt.IOInterruptIOC(prefix, *, macros=None, parent=None, name=None)[source]
IOInterruptIOC pvproperties

Attribute

Suffix

Docs

Type

Notes

Alarm Group

keypress

keypress

str

Length(10) Startup

Methods

group_read(instance)

Generic read called for channels without get defined

group_write(instance, value)

Generic write called for channels without put defined

Attributes

default_values

type_map

type_map_read_only

pvproperty methods

keypress.startup(self, instance, async_lib)
Source code: keypress.startup
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
    @keypress.startup
    async def keypress(self, instance, async_lib):
        # This method will be called when the server starts up.
        print('* keypress method called at server startup')
        queue = async_lib.ThreadsafeQueue()

        # Start a separate thread that monitors keyboard input, telling it to
        # put new values into our async-friendly queue
        thread = threading.Thread(target=start_io_interrupt_monitor,
                                  daemon=True,
                                  kwargs=dict(new_value_callback=queue.put))
        thread.start()

        # Loop and grab items from the queue one at a time
        while True:
            value = await queue.async_get()
            print(f'Saw new value on async side: {value!r}')

            # Propagate the keypress to the EPICS PV, triggering any monitors
            # along the way
            await self.keypress.write(str(value))