我们经常在编写程序时,碰到网络问题、或者其他问题导致异常抛出,使得我们当前的任务中断。 此时,我们可能并不想直接中断任务,而是想重新尝试去执行异常抛出的部门代码。Tenacity 就是一个可以解决此问题的用 Python 编写的重试库。
Document:https://tenacity.readthedocs.io/en/latest/
安装命令:
pip install tenacity
1. 无限重试执行函数
import tenacity @tenacity.retry def never_gonna_give_you_up(): print("Retry forever ignoring Exceptions, don't wait between retries") raise Exception if __name__ == '__main__': never_gonna_give_you_up()
当函数 never_gonna_give_you_up 抛出异常时,将会重新执行该函数,上面的代码将会永远执行,直到程序进程被强制结束。
2. 设置重试执行次数
import tenacity @tenacity.retry(stop=tenacity.stop_after_attempt(5)) def never_gonna_give_you_up(): print("Retry forever ignoring Exceptions, don't wait between retries") raise Exception if __name__ == '__main__': never_gonna_give_you_up()
我们通过对 retry 的 stop 参数设置,当尝试执行 never_gonna_give_you_up 超过 5 次仍然抛出异常时,tenacity 将会抛出异常,并终止应用程序。异常如下:
tenacity.RetryError: RetryError[<Future at 0x7fe1300c9050 state=finished raised Exception>]
3. 设置重试执行时间
import tenacity @tenacity.retry(stop=tenacity.stop_after_delay(1)) def never_gonna_give_you_up(): print("Retry forever ignoring Exceptions, don't wait between retries") raise Exception if __name__ == '__main__': never_gonna_give_you_up()
通过使用 tenacity.stop_after_delay 可以设置尝试调用 never_gonna_give_you_up 函数多久,超过该时间仍然抛出异常,则终止程序。时间单位为秒。抛出异常如下:
tenacity.RetryError: RetryError[<Future at 0x7f7f100b3890 state=finished raised Exception>]
4. 同时设置重试执行次数和时间
import tenacity @tenacity.retry(stop=(tenacity.stop_after_delay(5) | tenacity.stop_after_attempt(5))) def stop_after_10_s_or_5_retries(): print("Stopping after 10 seconds or 5 retries") raise Exception if __name__ == '__main__': stop_after_10_s_or_5_retries()
当重试达到次数或者时间中任何一个限制,程序将会抛出异常或终止。
5. 间隔指定秒重试执行
import tenacity @tenacity.retry(wait=tenacity.wait_fixed(2)) def wait_2_s(): print("Wait 2 second between retries") raise Exception if __name__ == '__main__': wait_2_s()
上述代码将会每 2 秒执行一次,直到程序被强制终止。
6. 间隔随机时间重试执行
import tenacity @tenacity.retry(wait=tenacity.wait_random(min=1, max=2)) def wait_random_1_to_2_s(): print("Randomly wait 1 to 2 seconds between retries") raise Exception if __name__ == '__main__': wait_random_1_to_2_s()
每次等待间隔将会在 min 和 max 之间随机产生。
7. 抛出指定异常时重试
import tenacity @tenacity.retry(retry=tenacity.retry_if_exception_type(IOError)) def might_io_error(): print("Retry forever with no wait if an IOError occurs, raise any other errors") raise IOError if __name__ == '__main__': might_io_error()
上述代码,我们指定抛出 IOError 时尝试重试执行该函数,如果抛出其他异常则不进行重试执行。
其他更多种的用法参考文档:https://tenacity.readthedocs.io/en/latest/