Using executors to wrap blocking code

AsyncIO provides its own version of the futures library to allow us to run code in a separate thread or process when there isn't an appropriate non-blocking call to be made. This allows us to combine threads and processes with the asynchronous model. One of the more useful applications of this feature is to get the best of both worlds when an application has bursts of I/O-bound and CPU-bound activity. The I/O-bound portions can happen in the event loop, while the CPU-intensive work can be spun off to a different process. To illustrate this, let's implement sorting as a service using AsyncIO:

import asyncio
import json
from concurrent.futures import ProcessPoolExecutor


def sort_in_process(data):
nums = json.loads(data.decode())
curr = 1
while curr < len(nums):
if nums[curr] >= nums[curr - 1]:
curr += 1
else:
nums[curr], nums[curr - 1] = nums[curr - 1], nums[curr]
if curr > 1:
curr -= 1

return json.dumps(nums).encode()


async def sort_request(reader, writer):
print("Received connection")
length = await reader.read(8)
data = await reader.readexactly(int.from_bytes(length, "big"))
result = await asyncio.get_event_loop().run_in_executor(
None, sort_in_process, data
)
print("Sorted list")
writer.write(result)
writer.close()
print("Connection closed")


loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
server = loop.run_until_complete(
asyncio.start_server(sort_request, "127.0.0.1", 2015)
)
print("Sort Service running")

loop.run_forever()
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

This is an example of good code implementing some really stupid ideas. The whole idea of sorting as a service is pretty ridiculous. Using our own sorting algorithm instead of calling Python's sorted is even worse. The algorithm we used is called gnome sort, or in some cases, stupid sort. It is a slow sort algorithm implemented in pure Python. We defined our own protocol instead of using one of the many perfectly suitable application protocols that exist in the wild. Even the idea of using multiprocessing for parallelism might be suspect here; we still end up passing all the data into and out of the subprocesses. Sometimes, it's important to take a step back from the program you are writing and ask yourself whether you are trying to meet the right goals.

But ignoring the workload, let's look at some of the smart features of this design. First, we are passing bytes into and out of the subprocess. This is a lot smarter than decoding the JSON in the main process. It means the (relatively expensive) decoding can happen on a different CPU. Also, pickled JSON strings are generally smaller than pickled lists, so less data is passed between processes.

Second, the two methods are very linear; it looks like code is being executed one line after another. Of course, in AsyncIO, this is an illusion, but we don't have to worry about shared memory or concurrency primitives.