Using a thread pool

The first issue we will try to solve is the unbound limit of threads that are run by our program. A good solution would be to build a pool of threaded workers with a strictly defined size that will handle all the parallel work and communicate with workers through some thread-safe data structure. By using this thread pool approach, we will also make it easier to solve two other problems that we mentioned in the previous section.

So, the general idea is to start some predefined number of threads that will consume the work items from a queue until it becomes empty. When there is no other work to do, the threads will return and we will be able to exit from the program. A good candidate for our structure to be used to communicate with the workers is the Queue class from the built-in queue module. It is a First In First Out (FIFO) queue implementation that is very similar to the deque collection from the collections module, and was specifically designed to handle inter-thread communication. Here is a modified version of the main() function that starts only a limited number of worker threads with a new worker() function as a target and communicates with them using a thread-safe queue:

import time
from queue import Queue, Empty
from threading import Thread

import requests

THREAD_POOL_SIZE = 4

SYMBOLS = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')
BASES = ('USD', 'EUR', 'PLN', 'NOK', 'CZK')

def fetch_rates(base):
response = requests.get(
f"https://api.exchangeratesapi.io/latest?base={base}"
)
response.raise_for_status()
rates = response.json()["rates"]

# note: same currency exchanges to itself 1:1
rates[base] = 1.
rates_line = ", ".join(
[f"{rates[symbol]:7.03} {symbol}" for symbol in SYMBOLS]
)
print(f"1 {base} = {rates_line}")

def worker(work_queue):
while not work_queue.empty():
try:
item = work_queue.get(block=False)
except Empty:
break
else:
fetch_rates(item)
work_queue.task_done()

def main():
work_queue = Queue()

for base in BASES:
work_queue.put(base)

threads = [
Thread(target=worker, args=(work_queue,))
for _ in range(THREAD_POOL_SIZE)
]

for thread in threads:
thread.start()

work_queue.join()

while threads:
threads.pop().join()

if __name__ == "__main__":
started = time.time()
main()
elapsed = time.time() - started

print()
print("time elapsed: {:.2f}s".format(elapsed))

The following result of running a modified version of our program is similar to the previous one:

$ python3 threads_thread_pool.py
1 EUR = 1.13 USD, 1.0 EUR, 4.29 PLN, 9.62 NOK, 25.6 CZK
1 NOK = 0.117 USD, 0.104 EUR, 0.446 PLN, 1.0 NOK, 2.66 CZK
1 USD = 1.0 USD, 0.887 EUR, 3.8 PLN, 8.53 NOK, 22.7 CZK
1 PLN = 0.263 USD, 0.233 EUR, 1.0 PLN, 2.24 NOK, 5.98 CZK
1 CZK = 0.044 USD, 0.039 EUR, 0.167 PLN, 0.375 NOK, 1.0 CZK

time elapsed: 0.17s

The overall runtime may be slower than in situations with one thread per argument, but at least now it is not possible to exhaust all the computing resources with an arbitrarily long input. Also, we can tweak the THREAD_POOL_SIZE parameter for better resource/time balance.

We will look at how to use two-way queues in the next section.