Exploring asyncio and (web)sockets
So for a recent project I ended up working with websockets and then started to work with the async features offered by the websocket library. While reading through various bits of documentation and posts online I saw that most of the wisdom seems to either be wrong or outdated. So here is what I learned and what worked for me using python 3.11.
Websockets where the simple part
Going through the hello world
example for the websockets you get something like this:
#!/usr/bin/env python
import asyncio
from websockets.server import serve
async def echo(websocket):
async for message in websocket:
await websocket.send(message)
async def server():
async with serve(echo, "localhost", 8765):
await asyncio.Future() # run forever
asyncio.run(server())
But there still is a lot that this leaves out. The first thing that I wanted was a way for the program to interact
with the user via the command line. Sure I could use click
for a command line, but what if the user needed to
interact with the running process? So I need some way to interact with the terminal. There is of course input
,
but it is blocking, so I can't just run it in a task and wait for the it to unblock, as this will stall the asyncio
loop
as well.
Many older posts suggest a loop.run_in_executor
but after getting the same blocking result (as long as I was not
using a special executor for it) it was clear that this was not the way for a simple 'fire and forget' sort of task.
Gladly in the running in threads section
of the asyncio documentation there is a reference to a rather simple sounding to_thread
method, that does exactly
what we want to. With the help of an asyncio.Event
we can now have something running, that will trigger an event
inside the other running tasks allowing them to shut down. The code for this looks something like
server_close = asyncio.Event()
def cli():
while True:
command = input("To stop the server type 'stop': ")
if command == "stop":
server_close.set()
break
after which the cli
function can be run with await asyncio.to_thread(cli)
. When this needs to be scheduled alongside
the server we will need another function that wraps both of them into a task:
async def server_with_cli():
await asyncio.gather(asyncio.to_thread(cli), server())
while changing the last line of the top example to asyncio.run(run_server_with_cli())
If we wan't the to work with a command line interface that lets the user terminate the server, we need to modify the server and the echo function so as to have them react to the stop signal from the command line
#!/usr/bin/env python
import asyncio
from websockets.server import serve
server_close = asyncio.Event()
def cli():
while True:
command = input("To stop the server type 'stop': ")
if command == "stop":
server_close.set()
break
async def echo(websocket):
async for message in websocket:
if server_close.is_set():
break
await websocket.send(message)
async def server():
async with serve(echo, "localhost", 8765):
await asyncio.Future()
async def server_with_cli():
await asyncio.gather(asyncio.to_thread(cli), server())
if __name__ == "__main__":
asyncio.run(server_with_cli())
Running this with a slightly modified example, does however not actually stop the server (as the await asyncio.Future()
keeps the server alive forever). This allows other clients to also connect with the server but as the event is set, the
server immediately closes the connection. There are two methods to remedy this. First way is to have the server shut down
entirely when we type stop by replacing the await asyncio.Future()
in the server
function by an await server_close.wait()
which will end the server task and shut it down. The second way to do it, is to have the server_close
event close the
connection only and be reset when a new connection opens. This requires the echo
function to first clear the event and then
wait for messages from the client (this causes some race conditions with connecting clients, but we will ignore this for now).
This would subsequently be:
async def echo(websocket):
server_close.clear()
async for message in websocket:
if server_close.is_set():
break
await websocket.send(message)
Besides the race condition, this would also close all open connections indiscriminately, so we'd really need some sort of system to allow for particular connections to be closed (which would need more elaborate user interaction and probably a dictionary of sorts).
Looking at input from the command line
Besides putting the interaction with the user into a different thread, it is also possible to use the lower level api of asyncio
to
interact with the stream more directly. The event loop
allows to add a callback for when a specific file descriptor has input that can be
read. By using the stdin as this descriptor, we can add a hook to the input stream and interact with the user that way.
import sys
def handle_user_input():
user_input = sys.stdin.readline()
# do something with the user input here
if user_input == "stop":
server_close.set()
async def server():
loop = asyncio.get_running_loop()
loop.add_reader(sys.stdin, handle_user_input)
async with server(echo, "localhost", 8765):
await server_close.wait()
This works due to a particularity of stdin for unix systems is that the sys.stdin reader event will only tell the event loop that something is
available, when there is exactly one new line available, meaning sys.stdin.readline
will return immediately with a single line that we can use to
set/clear various events.
There are of course more sophisticated systems available online, but I like the simplicity of this one. Cheers