So for a recent project I ended up working with websockets and then started to work with the async features offered by the websocket library. While reading through various bits of documentation and posts online I saw that most of the wisdom seems to either be wrong or outdated. So here is what I learned and what worked for me using python 3.11.

Websockets where the simple part

Going through the hello world example for the websockets you get something like this:

#!/usr/bin/env python

import asyncio
from websockets.server import serve

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def server():
    async with serve(echo, "localhost", 8765):
        await asyncio.Future()  # run forever

asyncio.run(server())

But there still is a lot that this leaves out. The first thing that I wanted was a way for the program to interact with the user via the command line. Sure I could use click for a command line, but what if the user needed to interact with the running process? So I need some way to interact with the terminal. There is of course input, but it is blocking, so I can't just run it in a task and wait for the it to unblock, as this will stall the asyncio loop as well.

Many older posts suggest a loop.run_in_executor but after getting the same blocking result (as long as I was not using a special executor for it) it was clear that this was not the way for a simple 'fire and forget' sort of task. Gladly in the running in threads section of the asyncio documentation there is a reference to a rather simple sounding to_thread method, that does exactly what we want to. With the help of an asyncio.Event we can now have something running, that will trigger an event inside the other running tasks allowing them to shut down. The code for this looks something like

server_close = asyncio.Event()

def cli():
    while True:
        command = input("To stop the server type 'stop': ")
        if command == "stop":
            server_close.set()
            break

after which the cli function can be run with await asyncio.to_thread(cli). When this needs to be scheduled alongside the server we will need another function that wraps both of them into a task:

async def server_with_cli():
    await asyncio.gather(asyncio.to_thread(cli), server())

while changing the last line of the top example to asyncio.run(run_server_with_cli())

If we wan't the to work with a command line interface that lets the user terminate the server, we need to modify the server and the echo function so as to have them react to the stop signal from the command line

#!/usr/bin/env python
import asyncio
from websockets.server import serve

server_close = asyncio.Event()

def cli():
    while True:
        command = input("To stop the server type 'stop': ")
        if command == "stop":
            server_close.set()
            break

async def echo(websocket):
    async for message in websocket:
        if server_close.is_set():
            break
        await websocket.send(message)

async def server():
    async with serve(echo, "localhost", 8765):
        await asyncio.Future()

async def server_with_cli():
    await asyncio.gather(asyncio.to_thread(cli), server())

if __name__ == "__main__":
    asyncio.run(server_with_cli())

Running this with a slightly modified example, does however not actually stop the server (as the await asyncio.Future() keeps the server alive forever). This allows other clients to also connect with the server but as the event is set, the server immediately closes the connection. There are two methods to remedy this. First way is to have the server shut down entirely when we type stop by replacing the await asyncio.Future() in the server function by an await server_close.wait() which will end the server task and shut it down. The second way to do it, is to have the server_close event close the connection only and be reset when a new connection opens. This requires the echo function to first clear the event and then wait for messages from the client (this causes some race conditions with connecting clients, but we will ignore this for now). This would subsequently be:

async def echo(websocket):
    server_close.clear()
    async for message in websocket:
        if server_close.is_set():
            break
        await websocket.send(message)

Besides the race condition, this would also close all open connections indiscriminately, so we'd really need some sort of system to allow for particular connections to be closed (which would need more elaborate user interaction and probably a dictionary of sorts).

Looking at input from the command line

Besides putting the interaction with the user into a different thread, it is also possible to use the lower level api of asyncio to interact with the stream more directly. The event loop allows to add a callback for when a specific file descriptor has input that can be read. By using the stdin as this descriptor, we can add a hook to the input stream and interact with the user that way.

import sys

def handle_user_input():
    user_input = sys.stdin.readline()
    # do something with the user input here
    if user_input == "stop":
        server_close.set()

async def server():
    loop = asyncio.get_running_loop()
    loop.add_reader(sys.stdin, handle_user_input)
    async with server(echo, "localhost", 8765):
        await server_close.wait()

This works due to a particularity of stdin for unix systems is that the sys.stdin reader event will only tell the event loop that something is available, when there is exactly one new line available, meaning sys.stdin.readline will return immediately with a single line that we can use to set/clear various events.

There are of course more sophisticated systems available online, but I like the simplicity of this one. Cheers