import asyncio import datetime from functools import wraps from loguru import logger def retry_async(max_retries=3, initial_delay=1, exceptions=(Exception,)): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): attempt = 0 delay = initial_delay while attempt < max_retries: try: return await func(*args, **kwargs) except exceptions as e: logger.info( f"{e=}, attempt {attempt + 1}/{max_retries}, will retry in {delay} seconds" ) logger.exception(f"{e=}") attempt += 1 if attempt >= max_retries: logger.info("Max retries reached, aborting") raise e await asyncio.sleep(delay) delay *= 2 # 延迟时间加倍 return wrapper return decorator if __name__ == "__main__": @retry_async() async def main(): now = datetime.datetime.now() logger.info(f"{now=}") # Fail twice then succeed if main.counter < 2: main.counter += 1 raise Exception(now) return now main.counter = 0 asyncio.run(main())