chancy.job module¶
- class chancy.job.Job(*, func: str, queue: str = 'default', kwargs: dict[str, ~typing.Any] | None = <factory>, priority: int = 0, max_attempts: int = 1, scheduled_at: ~datetime.datetime = <factory>, limits: list[~chancy.job.Limit] = <factory>, unique_key: str | None = None, meta: dict[str, ~typing.Any] = <factory>)[source]¶
Bases:
object
A job is an immutable, stateless unit of work that can be pushed onto a Chancy queue and executed elsewhere.
- classmethod from_func(func, **kwargs)[source]¶
Create a job from a function, attempting to determine the function’s importable name automatically.
def hello_world(): pass job = Job.from_func(hello_world)
- func: str[source]¶
An importable name for the function that should be executed when this job is run. Ex: my_module.my_function
- kwargs: dict[str, Any] | None[source]¶
The keyword arguments to pass to the job function when it is executed.
- max_attempts: int = 1[source]¶
The maximum number of times this job can be attempted before it is considered failed.
- meta: dict[str, Any][source]¶
Arbitrary metadata associated with this job instance. Plugins can use this to store additional information during the execution of a job.
- pack() dict [source]¶
Pack the job into a dictionary that can be serialized and used to recreate the job later.
- priority: int = 0[source]¶
The priority of this job. Jobs with higher priority values will be executed before jobs with lower priority values.
- class chancy.job.Limit(type_: Type, value: int)[source]¶
Bases:
object
A limit that can be applied to a job.
- class chancy.job.QueuedJob(*, func: str, queue: str = 'default', kwargs: dict[str, ~typing.Any] | None = <factory>, priority: int = 0, max_attempts: int = 1, scheduled_at: ~datetime.datetime = <factory>, limits: list[~chancy.job.Limit] = <factory>, unique_key: str | None = None, meta: dict[str, ~typing.Any] = <factory>, id: str, started_at: ~datetime.datetime | None = None, completed_at: ~datetime.datetime | None = None, attempts: int = 0, state: ~chancy.job.QueuedJob.State = State.PENDING, errors: list[~chancy.job.ErrorT] = <factory>)[source]¶
Bases:
Job
A job instance is a job that has been pushed onto a queue and now has stateful information associated with it, such as the number of attempts so far.
- class State(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
- completed_at: datetime | None = None[source]¶
The time at which this job was completed, if it has been completed.
- class chancy.job.Reference(identifier: str)[source]¶
Bases:
object
References a Job in the queue.
This object can be used to retrieve the job instance later, or wait for it to complete. It is returned by the
push()
,push_many()
, andpush_many_ex()
functions.Waiting for a job to finish:
async with Chancy("postgresql://localhost/postgres") as chancy: ref = await chancy.push(Job.from_func(my_function)) job = await chancy.wait_for_job(ref) print(job.state) # "succeeded"
Retrieving a job instance:
async with Chancy("postgresql://localhost/postgres") as chancy: ref = await chancy.push(Job.from_func(my_function)) job = await chancy.get_job(ref) print(job.state) # "pending"
- chancy.job.job(**options) Callable[[Callable[[P], R]], IsAJob[P, R]] [source]¶
A decorator that wraps a function and turns it into a job.
The wrapped function can still be called as normal, but will have an extra
job
attribute that contains the job instance.>>> @job() ... def hello_world(): ... return "Hello, world!" >>> hello_world.job Job(func=<function my_job at 0x1033041e0>, kwargs={}, ...) >>> hello_world() 'Hello, world!'
Your decorated function can be pushed like any other job, or you can use the job property to modify its properties before pushing it.
await chancy.push(hello_world) await chancy.push(hello_world.job.with_queue("low_priority"))