Make Jobs

Jobs are the core of Chancy. They are the functions that are run by your workers.

Creating a Job

Use the job() decorator to create a job:

from chancy import job

@job()
def greet():
    print(f"Hello world!")

You can still call this function normally:

>>> greet()
Hello world!

You can also specify the defaults for a job:

from chancy import job

@job(queue="default", priority=1, max_attempts=3, kwargs={"name": "World"})
def greet(*, name: str):
    print(f"Hello, {name}!")

Jobs are immutable once created - use the with_ methods on a Job to create a new job with modified properties:

@job(queue="default", priority=1, max_attempts=3, kwargs={"name": "World"})
def greet(*, name: str):
    print(f"Hello, {name}!")

 async with Chancy("postgresql://localhost/postgres") as chancy:
    await chancy.push(greet.job.with_kwargs(name="Alice"))

Queue a Job

Once you’ve created a job, push it to the queue:

async with Chancy("postgresql://localhost/postgres") as chancy:
    await chancy.push(greet)

Queue multiple jobs at once:

await chancy.push_many([job1, job2, job3])

Push returns a Reference object that can be used to retrieve the job instance later, or wait for it to complete:

reference = await chancy.push(greet)
finished_job = await chancy.wait_for_job(reference)
assert finished_job.state == finished_job.State.SUCCEEDED

Priority

Priority determines the order of execution. The higher the priority, the sooner the job will be executed:

higher_priority_job = greet.job.with_priority(10)
lower_priority_job = greet.job.with_priority(-10)

Retry Attempts

Specify how many times a job should be retried if it fails:

greet.job.with_max_attempts(3)

Scheduled Execution

Schedule a job to run some time in the future:

from datetime import datetime, timedelta, timezone

future_job = greet.job.with_scheduled_at(
    datetime.now(timezone.utc) + timedelta(hours=1)
)

Note

Scheduled jobs are guaranteed to run at or after the scheduled time, but not exactly at that time.

Tip

If you need recurring jobs, take a look at the Cron plugin.

Resource Limits

Set memory and time limits for job execution:

from chancy import Limit, job

@job(limits=[
    Limit(Limit.Type.MEMORY, 1024 * 1024 * 1024),
    Limit(Limit.Type.TIME, 60),
])
def greet(*, name: str):
    print(f"Hello, {name}!")

Not all executors will support all types of limits. For example only the default ProcessExecutor supports memory limits.

Unique Jobs

Prevent duplicate job execution by assigning a unique key:

from chancy import job

@job()
def greet(*, name: str):
    print(f"Hello, {name}!")

async with Chancy("postgresql://localhost/postgres") as chancy:
    await chancy.push(greet.job.with_unique_key("greet_alice").with_kwargs(name="Alice"))

Note

Unique jobs ensure only one job with the same unique_key is queued or running at a time, but any number can be completed or failed.