Workflows

class chancy.plugins.workflow.WorkflowPlugin(*, polling_interval: int = 30, max_workflows_per_run: int = 1000, pruning_rule: ~chancy.rule.Rule = <chancy.rule.Condition object>)[source]

Bases: Plugin

Support for dependency-based workflows.

Workflows allow you to easily model complex processes that involve multiple jobs, each of which may depend on the completion of one or more other jobs. Workflows are modeled as a directed acyclic graph (DAG) where each node represents a job and each edge represents a dependency between two jobs.

Workflows are implemented on top of the existing Chancy job system, meaning you can use all the existing job features, such as job retries, timeouts, scheduling, and so on.

Enable the plugin by adding it to the list of plugins in the Chancy constructor:

from chancy.plugins.leadership import Leadership
from chancy.plugins.workflow import WorkflowPlugin

async with Chancy(
    "postgresql://localhost/postgres",
    plugins=[Leadership(), WorkflowPlugin()]
) as chancy:
    ...

Example

We’ll create a simple workflow runs the “top” job first, then the “left” and “right” jobs in parallel, and finally the “bottom” job:

example_workflow.py
 import asyncio
 from chancy import Chancy, job
 from chancy.plugins.leadership import Leadership
 from chancy.plugins.workflow import Workflow, WorkflowPlugin

 @job()
 async def top():
     print(f"Top")

 @job()
 async def left():
     print(f"Left")

 @job()
 async def right():
     print(f"Right")

 @job()
 async def bottom():
     print(f"Bottom")

 async def main():
     async with Chancy(
         "postgresql://localhost/postgres",
         plugins=[Leadership(), WorkflowPlugin()]
     ) as chancy:
         workflow = (
             Workflow("example")
             .add("top", top)
             .add("left", left, ["top"])
             .add("right", right, ["top"])
             .add("bottom", bottom, ["left", "right"])
         )
         await WorkflowPlugin.push(chancy, workflow)

 if __name__ == "__main__":
     asyncio.run(main())

If we visualize our newly created workflow using generate_dot(), we get:

digraph "example" {
  rankdir=TB;
  node [shape=box, style="rounded,filled", fontname="Arial"];
  "top" [label="top\n(pending)", fillcolor=lightblue];
  "left" [label="left\n(pending)", fillcolor=lightblue];
  "right" [label="right\n(pending)", fillcolor=lightblue];
  "bottom" [label="bottom\n(pending)", fillcolor=lightblue];
  "top" -> "left";
  "top" -> "right";
  "left" -> "bottom";
  "right" -> "bottom";
  labelloc="t";
  label="Workflow: example\nState: pending";
}

The full Workflow API is a little verbose if you just want to run a series of jobs in a specific order. In that case, you can use the Sequence class to create a workflow from a list of jobs.

Django Integration

This plugin can be made available to the Django ORM and Admin interface.

To enable this, you need to add the following to your Django settings:

INSTALLED_APPS = [
    ...,
    "chancy.plugins.workflow.django",
]

You can then query the workflows and steps using the Django ORM:

from chancy.plugins.workflow.django.models import (
    Workflow,
    WorkflowStep
)

workflow = Workflow.objects.get(id="...")

completed_steps = WorkflowStep.objects.filter(
    workflow=workflow,
    state=QueuedJob.State.SUCCEEDED
)
param polling_interval:

The interval at which to poll for new workflows.

param max_workflows_per_run:

The maximum number of workflows to process in a single run of the plugin.

class Rules[source]

Bases: object

class Age[source]

Bases: Rule

to_sql() Composable[source]
api_plugin() str | None[source]

If this plugin has an associated API component, returns the import string for the plugin.

async cleanup(chancy: Chancy) int | None[source]

Clean up any resources used by the plugin.

Should return either None, if no work was done, or the number of rows cleaned up.

Note

Normally, you don’t need to call this yourself. The Pruner plugin will call the cleanup method of all other registered plugins.

async classmethod fetch_workflow(chancy: Chancy, id_: str) Workflow | None[source]

Fetch a single workflow from the database.

Parameters:
  • chancy – The Chancy application.

  • id – The ID of the workflow to fetch.

Returns:

The workflow, or None if it does not exist.

async classmethod fetch_workflow_ex(cursor: AsyncCursor, chancy: Chancy, id_: str) Workflow | None[source]

Fetch a single workflow from the database.

This method is a lower-level version of fetch_workflow that accepts an existing cursor object, allowing it to be used in transactions.

Parameters:
  • cursor – The cursor to use for the query.

  • chancy – The Chancy application.

  • id – The ID of the workflow to fetch.

Returns:

The workflow, or None if it does not exist.

async classmethod fetch_workflows(chancy: Chancy, *, states: list[str] | None = None, ids: list[str] | None = None, limit: int = 100) List[Workflow][source]

Fetch workflows from the database, optionally matching the given conditions.

Parameters:
  • chancy – The Chancy application.

  • states – A list of states to match.

  • ids – A list of IDs to match.

  • limit – The maximum number of workflows to fetch.

Returns:

A list of workflows.

async static fetch_workflows_ex(cursor: AsyncCursor, chancy: Chancy, *, states: list[str] | None = None, ids: list[str] | None = None, limit: int = 100) List[Workflow][source]

Fetch workflows from the database, optionally matching the given conditions.

This method is a lower-level version of fetch_workflows that accepts an existing cursor object, allowing it to be used in transactions.

Parameters:
  • cursor – The cursor to use for the query.

  • chancy – The Chancy application.

  • states – A list of states to match.

  • ids – A list of IDs to match.

  • limit – The maximum number of workflows to fetch.

Returns:

A list of workflows.

static generate_dot(workflow: Workflow, output: TextIO)[source]

Generate a DOT file representation of the workflow.

Parameters:
  • workflow – The Workflow object to visualize.

  • output – A file-like object to write the DOT content to.

static get_dependencies() list[str][source]

Get the identifiers of all plugins this plugin depends on, if any.

Plugins that depend on the presence of other plugins will refuse to start if those dependencies are not met.

static get_identifier() str[source]

Returns a unique identifier for this plugin.

This identifier should be unique across all active plugins. If a custom plugin provides compatible functionality to a built-in plugin, it may use the same identifier as the built-in plugin.

get_tables() list[str][source]

Get the names of all tables this plugin is responsible for.

migrate_key() str[source]

Get the migration key for this plugin, if it has any.

migrate_package() str[source]

Get the package name that contains the migration scripts for this plugin, if it has any.

async on_job_updated(*, worker: Worker, job: QueuedJob)[source]

Called after a job has been run and saved.

Unlike on_job_completed, this method cannot modify the job, but the job is guaranteed to have been updated in the database by the time it is called.

Parameters:
  • worker – The worker that is running the job.

  • job – The job that was completed.

async static process_workflow(cursor: AsyncCursor, chancy: Chancy, workflow: Workflow) bool[source]

Process a single iteration of the given workflow, progressing the state of each step and the overall workflow as necessary.

Parameters:
  • cursor – The cursor to use for the query.

  • chancy – The Chancy application.

  • workflow – The workflow to process.

Returns:

True if the workflow was updated, False otherwise.

async classmethod push(chancy: Chancy, workflow: Workflow) str[source]

Push new workflow to the database.

If the workflow already exists in the database, it will be updated instead.

Parameters:
  • chancy – The Chancy application.

  • workflow – The workflow to push.

Returns:

The UUID of the newly created workflow.

async static push_ex(cursor: AsyncCursor, chancy: Chancy, workflow: Workflow) str[source]

Push new workflow to the database.

This method is a lower-level version of push that accepts an existing cursor object, allowing it to be used in transactions.

If the workflow already exists in the database, it will be updated instead.

Parameters:
  • cursor – The cursor to use for the query.

  • chancy – The Chancy application.

  • workflow – The workflow to push.

Returns:

The UUID of the newly created workflow.

async run(worker: Worker, chancy: Chancy)[source]

Runs the plugin.

This function can and should run indefinitely, as it will be cancelled when the worker is stopped.

async classmethod wait_for_workflow(chancy: Chancy, workflow_id: str, *, interval: int = 1, timeout: float | int | None = None) Workflow[source]

Wait for a workflow to complete.

This method will loop until the workflow referenced by the provided ID has completed. The interval parameter controls how often the workflow status is checked. This will not block the event loop, so other tasks can run while waiting for the workflow to complete.

Example

workflow = Workflow("example")
workflow.add("step1", job1)
workflow.add("step2", job2, ["step1"])

workflow_id = await WorkflowPlugin.push(chancy, workflow)
completed_workflow = await WorkflowPlugin.wait_for_workflow(
    chancy,
    workflow_id,
    timeout=300  # 5 minute timeout
)
param chancy:

The Chancy application.

param workflow_id:

The ID of the workflow to wait for.

param interval:

The number of seconds to wait between checks.

param timeout:

The maximum number of seconds to wait for the workflow to complete. If not provided, the method will wait indefinitely.

raises asyncio.TimeoutError:

If the timeout is reached before the workflow completes.

raises KeyError:

If the workflow does not exist.

return:

The completed Workflow object.

class chancy.plugins.workflow.Workflow(name: str, steps: Dict[str, chancy.plugins.workflow.WorkflowStep] = <factory>, state: chancy.plugins.workflow.Workflow.State = <State.PENDING: 'pending'>, id: str = <factory>, created_at: datetime.datetime | None = None, updated_at: datetime.datetime | None = None)[source]

Bases: object

class State(*values)[source]

Bases: Enum

COMPLETED = 'completed'[source]
FAILED = 'failed'[source]
PENDING = 'pending'[source]
RUNNING = 'running'[source]
add(step_id: str, job: Job | IsAJob, dependencies: List[str] = None) Workflow[source]

Add a step to the workflow.

workflow.add("step_1", job)
workflow.add("step_2", job, ["step_1"])
Parameters:
  • step_id – The ID of the step.

  • job – The job to execute.

  • dependencies – A list of step IDs that this step depends on.

add_group(jobs: List[tuple[str, Job | IsAJob]], dependencies: List[str] = None) Workflow[source]

Add a group of steps to the workflow.

This is a convenience method for adding multiple steps to the workflow at once that are all dependent on the same set of dependencies.

workflow = Workflow("my_workflow")
workflow.add("setup", setup_job)
workflow.add_group([
    ("step_1", job_1),
    ("step_2", job_2),
    ("step_3", job_3),
], ["setup"])
Parameters:
  • jobs – A list of tuples of (step_id, job).

  • dependencies – A list of step IDs that this step depends on.

created_at: datetime | None = None[source]

The time the workflow was created.

id: str[source]

The unique ID of a specific run of the workflow.

property is_complete: bool[source]
property is_running: bool[source]
name: str[source]
state: State = 'pending'[source]

The current state of the workflow.

steps: Dict[str, WorkflowStep][source]

A dictionary of steps in the workflow, keyed by step ID.

property steps_by_state: Dict[State, List[WorkflowStep]][source]
updated_at: datetime | None = None[source]

The time the workflow was last updated.

class chancy.plugins.workflow.WorkflowStep(job: chancy.job.Job | chancy.job.IsAJob, step_id: str, dependencies: List[str] = <factory>, state: chancy.job.QueuedJob.State | None = <State.PENDING: 'pending'>, job_id: str | None = None)[source]

Bases: object

dependencies: List[str][source]

A list of step IDs that this step depends on.

job: Job | IsAJob[source]

The job to execute when this step is ready.

job_id: str | None = None[source]

The unique ID of a running Job which is associated with this step.

state: State | None = 'pending'[source]

The current state of the step.

step_id: str[source]

The unique ID of the step.

class chancy.plugins.workflow.Sequence(name: str, jobs: List[Job | IsAJob] = None)[source]

Bases: object

A sequential workflow.

Sequences are a special case of workflows, where each step depends on the previous step. This forms a linear chain of jobs that are executed in order.

Sequences are useful for defining sequences of jobs that must be executed in order, without the complexity of full workflows.

Example

example_sequence.py
 import asyncio
 from chancy import Chancy, job
 from chancy.plugins.workflow import Sequence

 @job()
 async def first():
     print("First")

 @job()
 async def second():
     print("Second")

 @job()
 async def third():
     print("Third")

 async def main():
     async with Chancy("postgresql://localhost/postgres") as chancy:
         sequence = Sequence("example_workflow", [first, second, third])
         await sequence.push(chancy)

 if __name__ == "__main__":
     asyncio.run(main())
add(job: Job | IsAJob) Self[source]

Add a job to the sequence.

workflow = (
    Sequence("example_sequence")
    .add(first)
    .add(second)
    .add(third)
)
Parameters:

job – The job to add.

async push(chancy: Chancy) str[source]

Push a sequence to the database.

Parameters:

chancy – The Chancy application.

Returns:

The UUID of the newly created chain.

class chancy.plugins.workflow.api.WorkflowApiPlugin(api)[source]

Bases: ApiPlugin

API plugin for workflows.

async static get_workflow(request, *, chancy, worker)[source]

Get a single workflow by ID.

async static get_workflows(request, *, chancy, worker)[source]

Get all known workflows.

name()[source]

Get the name of the plugin.

routes()[source]

Get a list of routes to add to the API.