Tenacity 重试执行库

我们经常在编写程序时,碰到网络问题、或者其他问题导致异常抛出,使得我们当前的任务中断。 此时,我们可能并不想直接中断任务,而是想重新尝试去执行异常抛出的部门代码。Tenacity 就是一个可以解决此问题的用 Python 编写的重试库。

Document:https://tenacity.readthedocs.io/en/latest/

安装命令:

pip install tenacity

1. 无限重试执行函数

import tenacity


@tenacity.retry
def never_gonna_give_you_up():
    print("Retry forever ignoring Exceptions, don't wait between retries")
    raise Exception


if __name__ == '__main__':
    never_gonna_give_you_up()

当函数 never_gonna_give_you_up 抛出异常时,将会重新执行该函数,上面的代码将会永远执行,直到程序进程被强制结束。

2. 设置重试执行次数

import tenacity


@tenacity.retry(stop=tenacity.stop_after_attempt(5))
def never_gonna_give_you_up():
    print("Retry forever ignoring Exceptions, don't wait between retries")
    raise Exception


if __name__ == '__main__':
    never_gonna_give_you_up()

我们通过对 retry 的 stop 参数设置,当尝试执行 never_gonna_give_you_up 超过 5 次仍然抛出异常时,tenacity 将会抛出异常,并终止应用程序。异常如下:

tenacity.RetryError: RetryError[<Future at 0x7fe1300c9050 state=finished raised Exception>]

3. 设置重试执行时间

import tenacity


@tenacity.retry(stop=tenacity.stop_after_delay(1))
def never_gonna_give_you_up():
    print("Retry forever ignoring Exceptions, don't wait between retries")
    raise Exception


if __name__ == '__main__':
    never_gonna_give_you_up()

通过使用 tenacity.stop_after_delay 可以设置尝试调用 never_gonna_give_you_up 函数多久,超过该时间仍然抛出异常,则终止程序。时间单位为秒。抛出异常如下:

tenacity.RetryError: RetryError[<Future at 0x7f7f100b3890 state=finished raised Exception>]

4. 同时设置重试执行次数和时间

import tenacity


@tenacity.retry(stop=(tenacity.stop_after_delay(5) | tenacity.stop_after_attempt(5)))
def stop_after_10_s_or_5_retries():
    print("Stopping after 10 seconds or 5 retries")
    raise Exception


if __name__ == '__main__':
    stop_after_10_s_or_5_retries()

当重试达到次数或者时间中任何一个限制,程序将会抛出异常或终止。

5. 间隔指定秒重试执行

import tenacity


@tenacity.retry(wait=tenacity.wait_fixed(2))
def wait_2_s():
    print("Wait 2 second between retries")
    raise Exception


if __name__ == '__main__':
    wait_2_s()

上述代码将会每 2 秒执行一次,直到程序被强制终止。

6. 间隔随机时间重试执行

import tenacity


@tenacity.retry(wait=tenacity.wait_random(min=1, max=2))
def wait_random_1_to_2_s():
    print("Randomly wait 1 to 2 seconds between retries")
    raise Exception


if __name__ == '__main__':
    wait_random_1_to_2_s()

每次等待间隔将会在 min 和 max 之间随机产生。

7. 抛出指定异常时重试

import tenacity


@tenacity.retry(retry=tenacity.retry_if_exception_type(IOError))
def might_io_error():
    print("Retry forever with no wait if an IOError occurs, raise any other errors")
    raise IOError


if __name__ == '__main__':
    might_io_error()

上述代码,我们指定抛出 IOError 时尝试重试执行该函数,如果抛出其他异常则不进行重试执行。

其他更多种的用法参考文档:https://tenacity.readthedocs.io/en/latest/

未经允许不得转载:一亩三分地 » Tenacity 重试执行库
评论 (0)

8 + 7 =