hymir.workflow module

class hymir.workflow.Chain(*jobs: JobResult | None | Chain | Group)

Bases: object

A chain of jobs which must be executed in sequence.

May contain other chains or groups.

class hymir.workflow.Group(*jobs: JobResult | None | Chain | Group)

Bases: object

A group of jobs which can be executed in parallel.

May contain other chains or groups.

class hymir.workflow.Workflow(workflow: Chain | Group | DiGraph)

Bases: object

A Workflow class encapsulates a graph representing groups and chains of jobs to run, along with settings that may control its execution.

Parameters:

workflow – The workflow to run.

class Callbacks(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

ON_FINISHED = 'on_finished'
property dependencies: list[tuple[str, list[str]]]

Get the dependencies for each node in the graph.

Returns a list of tuples, where the first element is the node, and the second element is a list of the nodes that are dependencies of the first node.

The nodes are returned in topological order.

Example:

>>> workflow = Workflow(
...     Chain(
...         dummy_job(),
...         Group(
...             dummy_job(),
...             dummy_job(),
...         ),
...     )
... )
>>> list(workflow.dependencies)
[
    ('1', []),
    ('2', ['1']),
    ('3', ['1']),
    ('4', ['2', '3']),
]
classmethod deserialize(data: str) Workflow

Deserialize a JSON-serialized workflow back into a Workflow object.

Parameters:

data – The JSON-serialized workflow.

property inputs: set[str]

Get all the inputs that are requested by jobs in the workflow.

property jobs: dict[str, Job]

Get all jobs in the workflow.

on(callback: Callbacks, j: JobResult | None)

Register a job to run as a callback.

For example, you can register a job to run when the workflow has finished executing, even if it failed:

@job()
def send_status_email():
    pass

workflow.on(Workflow.Callbacks.ON_FINISHED, send_status_email())

The result of a callback job is ignored and no outputs are set.

property outputs: set[str]

Get all the outputs that are provided by the jobs in the workflow.

serialize() str

Serialize the workflow to a JSON string.

Returns:

The JSON-serialized workflow.

hymir.workflow.build_graph_from_workflow(workflow: Group | Chain) DiGraph

Build a directed graph representing the workflow, with each node representing a job in the workflow. The edges represent the order in which the jobs should be executed.