Retry

class chancy.plugins.retry.RetryPlugin[source]

Bases: Plugin

Plugin that handles job retries based on settings stored in the Job’s metadata.

This plugin can be used as an example for implementing your own retry policies.

Usage:

from chancy import job
from chancy.plugins.retry import RetryPlugin

@job()
def job_that_fails():
    raise ValueError("This job should fail.")

async with Chancy(..., plugins=[RetryPlugin()]) as chancy:
    await chancy.declare(Queue("default"))
    await chancy.push(
        job_that_fails.job.with_max_attempts(3).with_meta({
            "retry_settings": {
                "backoff": 2,
                "backoff_factor": 3,
                "backoff_limit": 300,
                "backoff_jitter": [1, 5],
            }
        })
    )

The above example will retry the job 3 times, with a starting backoff of 2 seconds, a backoff factor of 3, a backoff limit of 300 seconds, and a random jitter of between 1 and 5 seconds.

static calculate_next_run(job: QueuedJob, retry_settings: dict) datetime[source]
classmethod get_scope() PluginScope[source]

Get the scope of this plugin. Scopes control when and where the plugin will be run.

By default, plugins are scoped to the worker.

async on_job_completed(*, job: QueuedJob, worker: Worker, exc: Exception | None = None, result: Any = None) QueuedJob[source]

Called after a job is completed (successfully or otherwise) and before the QueuedJob is updated in the database.

If an exception occurred during the job, exc will be the exception instance instead of None.

The passed job is immutable - to modify it, return a new QueuedJob object with the desired changes.

Parameters:
  • worker – The worker that is running the job.

  • job – The job that was completed.

  • exc – The exception that was raised, if any.

  • result – The result of the job, if any.

Returns:

The job to update in the database.