Tenacity
Description
Tenacity makes it easy to retry certain logic in the event of an exception or other conditions you specify.
A common scenario is being throttled or rate-limited when you have multiple processes or threads attempting to access the same resource.
If all those threads were spawned at a similar time and they all retry at the same interval, there’s a good chance they’re going to continue being rate limited when they all attempt to access the same resource simultaneously.
Ideally, each thread would retry with exponential backoff to reduce the load on the resource server. In addition, there would also be an element of randomness added when each task was retried, so each request to the resource server is staggered between threads.
Tenacity makes this painless with its wait_random_exponential
function.
Example
import concurrent.futures as cf
import logging
import random
import requests
from requests import HTTPError
from tenacity import (
wait_random_exponential,
retry,
stop_after_delay,
before_log,
)
logger = logging.getLogger(__name__)
@retry(
# keep retrying for 1 hour
stop=stop_after_delay(3600),
# randomly wait up to 2^x * 1 seconds between each retry until the range reaches 60 seconds,
# then randomly up to 60 seconds afterwards
wait=wait_random_exponential(multiplier=1, max=60),
# log retries
after=before_log(logger, logging.ERROR),
)
def fetch_user(user_id: int) -> tuple[int, dict]:
url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
simulate_rate_limit(url)
return user_id, requests.get(url).json()
def simulate_rate_limit(url: str):
if random.randrange(3) == 0:
raise HTTPError(f"429 Client Error: TOO MANY REQUESTS for url: {url}")
def main():
user_ids = list(range(1, 10))
with cf.ThreadPoolExecutor() as ex:
tasks = [ex.submit(fetch_user, uid) for uid in user_ids]
for t in cf.as_completed(tasks):
user_id, user = t.result()
user_name = user["name"]
print(f"user #{user_id} is named {user_name}")
if __name__ == "__main__":
main()