Workflows

class chancy.plugins.workflow.WorkflowPlugin(*, polling_interval: int = 1, max_workflows_per_run: int = 1000, pruning_rule: ~chancy.rule.Rule = <chancy.rule.Condition object>)[source]

Bases: Plugin

Support for dependency-based workflows.

Workflows allow you to easily model complex processes that involve multiple jobs, each of which may depend on the completion of one or more other jobs. Workflows are modeled as a directed acyclic graph (DAG) where each node represents a job and each edge represents a dependency between two jobs.

Workflows are implemented on top of the existing Chancy job system, meaning you can use all the existing job features, such as job retries, timeouts, scheduling, and so on.

Enable the plugin by adding it to the list of plugins in the Chancy constructor:

from chancy.plugins.leadership import Leadership
from chancy.plugins.workflow import WorkflowPlugin

async with Chancy(
    "postgresql://localhost/postgres",
    plugins=[Leadership(), WorkflowPlugin()]
) as chancy:
    ...

Example

We’ll create a simple workflow runs the “top” job first, then the “left” and “right” jobs in parallel, and finally the “bottom” job:

example_workflow.py
 import asyncio
 from chancy import Chancy, job
 from chancy.plugins.leadership import Leadership
 from chancy.plugins.workflow import Workflow, WorkflowPlugin

 @job()
 async def top():
     print(f"Top")

 @job()
 async def left():
     print(f"Left")

 @job()
 async def right():
     print(f"Right")

 @job()
 async def bottom():
     print(f"Bottom")

 async def main():
     async with Chancy(
         "postgresql://localhost/postgres",
         plugins=[Leadership(), WorkflowPlugin()]
     ) as chancy:
         workflow = (
             Workflow("example")
             .add(top, "top")
             .add(left, "left", ["top"])
             .add(right, "right", ["top"])
             .add(bottom, "bottom", ["left", "right"])
         )
         await WorkflowPlugin.push(chancy, workflow)

 if __name__ == "__main__":
     asyncio.run(main())

If we visualize our newly created workflow using generate_dot(), we get:

digraph "example" {
  rankdir=TB;
  node [shape=box, style="rounded,filled", fontname="Arial"];
  "top" [label="top\n(pending)", fillcolor=lightblue];
  "left" [label="left\n(pending)", fillcolor=lightblue];
  "right" [label="right\n(pending)", fillcolor=lightblue];
  "bottom" [label="bottom\n(pending)", fillcolor=lightblue];
  "top" -> "left";
  "top" -> "right";
  "left" -> "bottom";
  "right" -> "bottom";
  labelloc="t";
  label="Workflow: example\nState: pending";
}

The full Workflow API is a little verbose if you just want to run a series of jobs in a specific order. In that case, you can use the Sequence class to create a workflow from a list of jobs.

param polling_interval:

The interval at which to poll for new workflows.

param max_workflows_per_run:

The maximum number of workflows to process in a single run of the plugin.

class Rules[source]

Bases: object

class Age[source]

Bases: Rule

to_sql() Composable[source]
api_plugin() str | None[source]

If this plugin has an associated API component, returns the import string for the plugin.

async cleanup(chancy: Chancy) int | None[source]

Clean up any resources used by the plugin.

Should return either None, if no work was done, or the number of rows cleaned up.

Note

Normally, you don’t need to call this yourself. The Pruner plugin will call the cleanup method of all other registered plugins.

async classmethod fetch_workflow(chancy: Chancy, id_: str) Workflow | None[source]

Fetch a single workflow from the database.

Parameters:
  • chancy – The Chancy application.

  • id – The ID of the workflow to fetch.

Returns:

The workflow, or None if it does not exist.

async static fetch_workflows(chancy: Chancy, *, states: list[str] | None = None, ids: list[str] | None = None, limit: int = 100) List[Workflow][source]

Fetch workflows from the database, optionally matching the given conditions.

Parameters:
  • chancy – The Chancy application.

  • states – A list of states to match.

  • ids – A list of IDs to match.

  • limit – The maximum number of workflows to fetch.

Returns:

A list of workflows.

static generate_dot(workflow: Workflow, output: TextIO)[source]

Generate a DOT file representation of the workflow.

Parameters:
  • workflow – The Workflow object to visualize.

  • output – A file-like object to write the DOT content to.

classmethod get_scope() PluginScope[source]

Get the scope of this plugin. Scopes control when and where the plugin will be run.

By default, plugins are scoped to the worker.

migrate_key() str[source]

Get the migration key for this plugin, if it has any.

migrate_package() str[source]

Get the package name that contains the migration scripts for this plugin, if it has any.

async process_workflow(chancy: Chancy, workflow: Workflow)[source]

Process a single iteration of the given workflow, progressing the state of each step and the overall workflow as necessary.

Parameters:
  • chancy – The Chancy application.

  • workflow – The workflow to process.

async classmethod push(chancy: Chancy, workflow: Workflow) str[source]

Push new workflow to the database.

If the workflow already exists in the database, it will be updated instead.

Parameters:
  • chancy – The Chancy application.

  • workflow – The workflow to push.

Returns:

The UUID of the newly created workflow.

async run(worker: Worker, chancy: Chancy)[source]

Runs the plugin.

This function can and should run indefinitely, as it will be cancelled when the worker is stopped.

async classmethod wait_for_workflow(chancy: Chancy, workflow_id: str, *, interval: int = 1, timeout: float | int | None = None) Workflow[source]

Wait for a workflow to complete.

This method will loop until the workflow referenced by the provided ID has completed. The interval parameter controls how often the workflow status is checked. This will not block the event loop, so other tasks can run while waiting for the workflow to complete.

Example

workflow = Workflow("example")
workflow.add("step1", job1)
workflow.add("step2", job2, ["step1"])

workflow_id = await WorkflowPlugin.push(chancy, workflow)
completed_workflow = await WorkflowPlugin.wait_for_workflow(
    chancy,
    workflow_id,
    timeout=300  # 5 minute timeout
)
param chancy:

The Chancy application.

param workflow_id:

The ID of the workflow to wait for.

param interval:

The number of seconds to wait between checks.

param timeout:

The maximum number of seconds to wait for the workflow to complete. If not provided, the method will wait indefinitely.

raises asyncio.TimeoutError:

If the timeout is reached before the workflow completes.

raises KeyError:

If the workflow does not exist.

return:

The completed Workflow object.

class chancy.plugins.workflow.Workflow(name: str, steps: Dict[str, chancy.plugins.workflow.WorkflowStep] = <factory>, state: chancy.plugins.workflow.Workflow.State = <State.PENDING: 'pending'>, id: str = <factory>, created_at: datetime.datetime | None = None, updated_at: datetime.datetime | None = None)[source]

Bases: object

class State(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

COMPLETED = 'completed'[source]
FAILED = 'failed'[source]
PENDING = 'pending'[source]
RUNNING = 'running'[source]
add(step_id: str, job: Job | IsAJob, dependencies: List[str] = None) Workflow[source]

Add a step to the workflow.

workflow.add("step_1", job)
workflow.add("step_2", job, ["step_1"])
Parameters:
  • step_id – The ID of the step.

  • job – The job to execute.

  • dependencies – A list of step IDs that this step depends on.

add_group(jobs: List[tuple[str, Job | IsAJob]], dependencies: List[str] = None) Workflow[source]

Add a group of steps to the workflow.

This is a convenience method for adding multiple steps to the workflow at once that are all dependent on the same set of dependencies.

workflow = Workflow("my_workflow")
workflow.add("setup", setup_job)
workflow.add_group([
    ("step_1", job_1),
    ("step_2", job_2),
    ("step_3", job_3),
], ["setup"])
Parameters:
  • jobs – A list of tuples of (step_id, job).

  • dependencies – A list of step IDs that this step depends on.

created_at: datetime | None = None[source]

The time the workflow was created.

id: str[source]

The unique ID of a specific run of the workflow.

property is_complete: bool[source]
property is_running: bool[source]
name: str[source]
state: State = 'pending'[source]

The current state of the workflow.

steps: Dict[str, WorkflowStep][source]

A dictionary of steps in the workflow, keyed by step ID.

property steps_by_state: Dict[State, List[WorkflowStep]][source]
updated_at: datetime | None = None[source]

The time the workflow was last updated.

class chancy.plugins.workflow.WorkflowStep(job: chancy.job.Job | chancy.job.IsAJob, step_id: str, dependencies: List[str] = <factory>, state: chancy.job.QueuedJob.State | None = <State.PENDING: 'pending'>, job_id: str | None = None)[source]

Bases: object

dependencies: List[str][source]

A list of step IDs that this step depends on.

job: Job | IsAJob[source]

The job to execute when this step is ready.

job_id: str | None = None[source]

The unique ID of a running Job which is associated with this step.

state: State | None = 'pending'[source]

The current state of the step.

step_id: str[source]

The unique ID of the step.

class chancy.plugins.workflow.Sequence(name: str, jobs: List[Job | IsAJob] = None)[source]

Bases: object

A sequential workflow.

Sequences are a special case of workflows, where each step depends on the previous step. This forms a linear chain of jobs that are executed in order.

Sequences are useful for defining sequences of jobs that must be executed in order, without the complexity of full workflows.

Example

example_sequence.py
 import asyncio
 from chancy import Chancy, job
 from chancy.plugins.workflow import Sequence

 @job()
 async def first():
     print("First")

 @job()
 async def second():
     print("Second")

 @job()
 async def third():
     print("Third")

 async def main():
     async with Chancy("postgresql://localhost/postgres") as chancy:
         sequence = Sequence("example_workflow", [first, second, third])
         await sequence.push(chancy)

 if __name__ == "__main__":
     asyncio.run(main())
add(job: Job | IsAJob) Self[source]

Add a job to the sequence.

workflow = (
    Sequence("example_sequence")
    .add(first)
    .add(second)
    .add(third)
)
Parameters:

job – The job to add.

async push(chancy: Chancy) str[source]

Push a sequence to the database.

Parameters:

chancy – The Chancy application.

Returns:

The UUID of the newly created chain.

class chancy.plugins.workflow.api.WorkflowApiPlugin[source]

Bases: ApiPlugin

API plugin for workflows.

async static get_workflow(request, *, chancy, worker)[source]

Get a single workflow by ID.

async static get_workflows(request, *, chancy, worker)[source]

Get all known workflows.

name()[source]

Get the name of the plugin.

routes()[source]

Get a list of routes to add to the API.