chancy.job module

class chancy.job.ErrorT[source]

Bases: TypedDict

attempt: int[source]

The attempt number of the job when the error occurred.

traceback: str[source]

The error message, typically the traceback of an exception.

class chancy.job.IsAJob(*args, **kwargs)[source]

Bases: Protocol[P, R]

job: Job[source]
class chancy.job.Job(*, func: str, queue: str = 'default', kwargs: dict[str, ~typing.Any] | None = <factory>, priority: int = 0, max_attempts: int = 1, scheduled_at: ~datetime.datetime = <factory>, limits: list[~chancy.job.Limit] = <factory>, unique_key: str | None = None, meta: dict[str, ~typing.Any] = <factory>)[source]

Bases: object

A job is an immutable, stateless unit of work that can be pushed onto a Chancy queue and executed elsewhere.

classmethod from_func(func, **kwargs)[source]

Create a job from a function, attempting to determine the function’s importable name automatically.

def hello_world():
    pass

job = Job.from_func(hello_world)
func: str[source]

An importable name for the function that should be executed when this job is run. Ex: my_module.my_function

kwargs: dict[str, Any] | None[source]

The keyword arguments to pass to the job function when it is executed.

limits: list[Limit][source]

A list of resource limits that should be applied to this job.

max_attempts: int = 1[source]

The maximum number of times this job can be attempted before it is considered failed.

meta: dict[str, Any][source]

Arbitrary metadata associated with this job instance. Plugins can use this to store additional information during the execution of a job.

pack() dict[source]

Pack the job into a dictionary that can be serialized and used to recreate the job later.

priority: int = 0[source]

The priority of this job. Jobs with higher priority values will be executed before jobs with lower priority values.

queue: str = 'default'[source]

The queue to which this job should be pushed.

scheduled_at: datetime[source]

The time at which this job should be scheduled to run.

unique_key: str | None = None[source]

An optional, globally unique identifier for this job. If provided, only 1 copy of a job with this key will be allowed to run or be scheduled at a time.

classmethod unpack(data: dict) Job[source]

Unpack a serialized job into a Job instance.

with_kwargs(**kwargs) Job[source]
with_limits(limits: list[Limit]) Job[source]
with_max_attempts(max_attempts: int) Job[source]
with_meta(meta: dict[str, Any]) Job[source]
with_priority(priority: int) Job[source]
with_queue(queue: str) Job[source]
with_scheduled_at(scheduled_at: datetime) Job[source]
with_unique_key(unique_key: str) Job[source]
class chancy.job.Limit(type_: Type, value: int)[source]

Bases: object

A limit that can be applied to a job.

class Type(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

MEMORY = 'memory'[source]
TIME = 'time'[source]
classmethod deserialize(data: dict) Limit[source]
serialize() dict[source]
type_: Type[source]
value: int[source]
class chancy.job.QueuedJob(*, func: str, queue: str = 'default', kwargs: dict[str, ~typing.Any] | None = <factory>, priority: int = 0, max_attempts: int = 1, scheduled_at: ~datetime.datetime = <factory>, limits: list[~chancy.job.Limit] = <factory>, unique_key: str | None = None, meta: dict[str, ~typing.Any] = <factory>, id: str, started_at: ~datetime.datetime | None = None, completed_at: ~datetime.datetime | None = None, attempts: int = 0, state: ~chancy.job.QueuedJob.State = State.PENDING, errors: list[~chancy.job.ErrorT] = <factory>)[source]

Bases: Job

A job instance is a job that has been pushed onto a queue and now has stateful information associated with it, such as the number of attempts so far.

class State(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

FAILED = 'failed'[source]
PENDING = 'pending'[source]
RETRYING = 'retrying'[source]
RUNNING = 'running'[source]
SUCCEEDED = 'succeeded'[source]
attempts: int = 0[source]

The number of times this job has been attempted.

completed_at: datetime | None = None[source]

The time at which this job was completed, if it has been completed.

errors: list[ErrorT][source]

A list of errors that occurred during the execution of this job.

id: str[source]

The unique identifier for this job instance.

started_at: datetime | None = None[source]

The time at which this job was started, if it has been started.

state: State = 'pending'[source]

The current state of this job instance.

classmethod unpack(data: dict) QueuedJob[source]

Unpack a serialized job into a Job instance.

class chancy.job.Reference(identifier: str)[source]

Bases: object

References a Job in the queue.

This object can be used to retrieve the job instance later, or wait for it to complete. It is returned by the push(), push_many(), and push_many_ex() functions.

Waiting for a job to finish:

async with Chancy("postgresql://localhost/postgres") as chancy:
   ref = await chancy.push(Job.from_func(my_function))
   job = await chancy.wait_for_job(ref)
   print(job.state)  # "succeeded"

Retrieving a job instance:

async with Chancy("postgresql://localhost/postgres") as chancy:
    ref = await chancy.push(Job.from_func(my_function))
    job = await chancy.get_job(ref)
    print(job.state)  # "pending"
identifier[source]
chancy.job.job(**options) Callable[[Callable[[P], R]], IsAJob[P, R]][source]

A decorator that wraps a function and turns it into a job.

The wrapped function can still be called as normal, but will have an extra job attribute that contains the job instance.

>>> @job()
... def hello_world():
...    return "Hello, world!"
>>> hello_world.job
Job(func=<function my_job at 0x1033041e0>, kwargs={}, ...)
>>> hello_world()
'Hello, world!'

Your decorated function can be pushed like any other job, or you can use the job property to modify its properties before pushing it.

await chancy.push(hello_world)
await chancy.push(hello_world.job.with_queue("low_priority"))