The last of the issues mentioned earlier that you may experience when dealing with such types of problems are rate limits that have been imposed by external service providers. In the case of the foreign exchange rates API, the service maintainer did not inform us about any rate limits or throttling mechanisms. But many services (even paid ones) often do impose rate limits. Also, it isn't fair to abuse a service that is provided to users completely for free.
When using multiple threads, it is very easy to exhaust any rate limit or simply—if the service does not throttle incoming requests—saturate the service to the level that it will not be able to respond to anyone (this is known as a denial of service attack). The problem is even more serious due to the fact that we did not cover any failure scenario yet, and dealing with exceptions in multithreaded Python code is bit more complicated than usual.
The request.raise_for_status() function will raise an exception response and will have a status code indicating any type of error (for instance, rate limiting), and this is actually good news. This exception is raised in a separate thread and will not crash the entire program. The worker thread will, of course, exit immediately, but the main thread will wait for all tasks stored on work_queue to finish (with the work_queue.join() call). Without further improvement, we may end up in a situation where some of the worker threads crashed and the program will never exit. This means that our worker threads should gracefully handle possible exceptions and make sure that all items from the queue are processed.
Let's do some minor changes to our code in order to be prepared for any issues that may occur. In case of exceptions in the worker thread, we may put an error instance in to the results_queue queue and mark the current task as done, the same as we would do if there was no error. That way, we make sure that the main thread won't lock indefinitely while waiting in work_queue.join(). The main thread might then inspect results and re-raise any of the exceptions found on the results queue. Here are the improved versions of the worker() and main() functions that can deal with exceptions in a safer way (the changes are highlighted):
def worker(work_queue, results_queue):
while not work_queue.empty():
try:
item = work_queue.get(block=False)
except Empty:
break
else:
try:
result = fetch_rates(item)
except Exception as err:
results_queue.put(err)
else:
results_queue.put(result)
finally:
work_queue.task_done()
def main():
work_queue = Queue()
results_queue = Queue()
for base in BASES:
work_queue.put(base)
threads = [
Thread(target=worker, args=(work_queue, results_queue))
for _ in range(THREAD_POOL_SIZE)
]
for thread in threads:
thread.start()
work_queue.join()
while threads:
threads.pop().join()
while not results_queue.empty():
result = results_queue.get()
if isinstance(result, Exception):
raise result
present_result(*result)
When we are ready to handle exceptions, it is time to break our code. We don't want to abuse our free API and cause a denial of service. So, instead of putting a high load on that API, we will simulate a typical situation that is a result of many service throttling mechanisms. Many APIs return a 429 Too Many Requests HTTP response when the client exceeds the allowed rate limit. So, we will update the fetch_rates() function to override the status code of every few responses in a way that will cause an exception. The following is the updated version of the function that simulates a HTTP error every few requests:
def fetch_rates(base):
response = requests.get(
f"https://api.exchangeratesapi.io/latest?base={base}"
)
if random.randint(0, 5) < 1:
# simulate error by overriding status code
response.status_code = 429
response.raise_for_status()
rates = response.json()["rates"]
# note: same currency exchanges to itself 1:1
rates[base] = 1.
return base, rates
If you use it in your code, you should get the following similar error:
$ python3 threads_exceptions_and_throttling.py
1 PLN = 0.263 USD, 0.233 EUR, 1.0 PLN, 2.24 NOK, 5.98 CZK
1 EUR = 1.13 USD, 1.0 EUR, 4.29 PLN, 9.62 NOK, 25.6 CZK
1 USD = 1.0 USD, 0.887 EUR, 3.8 PLN, 8.53 NOK, 22.7 CZK
Traceback (most recent call last):
File "threads_exceptions_and_throttling.py", line 136, in <module>
main()
File "threads_exceptions_and_throttling.py", line 129, in main
raise result
File "threads_exceptions_and_throttling.py", line 96, in worker
result = fetch_rates(item)
File "threads_exceptions_and_throttling.py", line 70, in fetch_rates
response.raise_for_status()
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 429 Client Error: OK for url: https://api.exchangeratesapi.io/latest?base=NOK
Let's forget about our simulated failure and pretend that the preceding exception is not a result of faulty code. In such a situation, our program would be simply a bit too fast for this free service. It makes too many concurrent requests, and, in order to work correctly, we need to have a way to limit the program's pace.
Limiting the pace of work is often called throttling. There are a few packages in PyPI that allow you to limit the rate of any kind of work that are really easy to use. But we won't use any external code here. Throttling is a good opportunity to introduce some locking primitives for threading, so we will try to build some throttling solutions from scratch.
The algorithm we will use is sometimes called token bucket and is very simple. It includes the following functionality:
- There is a bucket with a predefined amount of tokens.
- Each token corresponds to a single permission to process one item of work.
- Each time the worker asks for a single or multiple tokens (permissions), we do the following:
- We measure how much time was spent from the last time we refilled the bucket
- If the time difference allows for that, we refill the bucket with the amount of tokens that correspond to this time difference
- If the amount of stored tokens is bigger or equal to the amount requested, we decrease the number of stored tokens and return that value
- If the amount of stored tokens is less than requested, we return zero
The two important things are to always initialize the token bucket with zero tokens and never allow it to overfill. If we don't follow these precautions, we can release the tokens in bursts that exceed the rate limit. Because, in our situation, the rate limit is expressed in requests per second, we don't need to deal with arbitrary quanta of time. We assume that the base for our measurement is one second, so we will never store more tokens than the number of requests allowed for that quant of time. Here is an example implementation of the class that allows for throttling with the token bucket algorithm:
From threading import Lock class Throttle: def __init__(self, rate): self._consume_lock = Lock() self.rate = rate self.tokens = 0 self.last = 0 def consume(self, amount=1): with self._consume_lock: now = time.time() # time measument is initialized on first # token request to avoid initial bursts if self.last == 0: self.last = now elapsed = now - self.last # make sure that quant of passed time is big # enough to add new tokens if int(elapsed * self.rate): self.tokens += int(elapsed * self.rate) self.last = now # never over-fill the bucket self.tokens = ( self.rate if self.tokens > self.rate else self.tokens ) # finally dispatch tokens if available if self.tokens >= amount: self.tokens -= amount else: amount = 0 return amount
The usage of this class is very simple. Let's assume that we created only one instance of Throttle (for example, Throttle(10)) in the main thread and passed it to every worker thread as a positional argument. Using the same data structure in different threads is safe because we guarded the manipulation of its internal state with the instance of the Lock class from the threading module. We can now update the worker() function implementation to wait with every item until the throttle object releases a new token, as follows:
def worker(work_queue, results_queue, throttle): while True: try: item = work_queue.get(block=False) except Empty: break else: while not throttle.consume():
pass try: result = fetch_rates(item) except Exception as err: results_queue.put(err) else: results_queue.put(result) finally: work_queue.task_done()
Let's take a look at a different concurrency model, which is explained in the next section.