#!/usr/bin/env python3
import atexit
import fcntl
import os
import sys
import termios
import textwrap
import threading
from caproto.server import PVGroup, ioc_arg_parser, pvproperty, run
[docs]def start_io_interrupt_monitor(new_value_callback):
    '''
    This function monitors the terminal it was run in for keystrokes.
    On each keystroke, it calls new_value_callback with the given keystroke.
    This is used to simulate the concept of an I/O Interrupt-style signal from
    the EPICS world. Those signals depend on hardware to tell EPICS when new
    values are available to be read by way of interrupts - whereas we use
    callbacks here.
    '''
    # Thanks stackoverflow and Python 2 FAQ!
    if not sys.__stdin__.isatty():
        print('[IO Interrupt] stdin is not a TTY, exiting')
        return
    fd = sys.stdin.fileno()
    oldterm = termios.tcgetattr(fd)
    newattr = termios.tcgetattr(fd)
    newattr[3] &= ~termios.ICANON & ~termios.ECHO
    oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
    print('Started monitoring the keyboard outside of the async library')
    # When the process exits, be sure to reset the terminal settings
    @atexit.register
    def reset_terminal():
        print('Resetting the terminal settings')
        termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
        fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
        print('Done')
    termios.tcsetattr(fd, termios.TCSANOW, newattr)
    fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
    # Loop forever, sending back keypresses to the callback
    while True:
        try:
            char = sys.stdin.read(1)
        except IOError:
            ...
        else:
            if char:
                print(f'New keypress: {char!r}')
                new_value_callback(char) 
[docs]class IOInterruptIOC(PVGroup):
    """
    An IOC that updates on keypress events.
    PVs
    ---
    keypress (str)
        The latest key pressed in the IOC terminal.
    """
    keypress = pvproperty(
        value='',
        max_length=10,
        doc="Latest keypress",
        read_only=True,
    )
    # NOTE the decorator used here:
    @keypress.startup
    async def keypress(self, instance, async_lib):
        # This method will be called when the server starts up.
        print('* keypress method called at server startup')
        queue = async_lib.ThreadsafeQueue()
        # Start a separate thread that monitors keyboard input, telling it to
        # put new values into our async-friendly queue
        thread = threading.Thread(
            target=start_io_interrupt_monitor,
            daemon=True,
            kwargs=dict(new_value_callback=queue.put)
        )
        thread.start()
        # Loop and grab items from the queue one at a time
        while True:
            value = await queue.async_get()
            print(f'Saw new value on async side: {value!r}')
            # Propagate the keypress to the EPICS PV, triggering any monitors
            # along the way
            await self.keypress.write(str(value)) 
if __name__ == '__main__':
    ioc_options, run_options = ioc_arg_parser(
        default_prefix='io:',
        desc=textwrap.dedent(IOInterruptIOC.__doc__),
    )
    ioc = IOInterruptIOC(**ioc_options)
    run(ioc.pvdb, **run_options)