Python Orchestrator API

The orchestrator module usually exposes two functions:

from motoko.workflow import event


def populate_arg_parser(parser):
    parser.add_argument("--inputs", "-i", type=float, required=True, nargs=2)


async def main(workflow, **params):
    # orchestrate the workflow here

populate_arg_parser(parser) is optional but useful for workflow-specific CLI arguments. Motoko calls it when launching the orchestrator function from the command line.

main(workflow, **params) registers routines and actions. It should be async. It normally ends by setting workflow.finished = True when some condition is met, or by otherwise terminating the workflow.

Asynchronous routines

Motoko orchestrators are asynchronous. This allows an orchestrator to express a sequence of dependent routines directly in main.

Use this pattern when the workflow follows a dependency graph that can be written as a sequence of awaited task submissions and selections:

async def run_mult(workflow, inputs):
    return await workflow.mult.createTask(x=inputs)


async def run_add(workflow, mult_runs):
    created = []
    for run, job in mult_runs:
        created.extend(await workflow.add.createTask(x=run.y))
    return created


async def main(workflow, **params):
    mult_runs = await run_mult(workflow, params["inputs"])
    add_runs = await run_add(workflow, mult_runs)
    await workflow.norm.createTask(
        mult_ids=[run.id for run, job in add_runs],
    )

TaskManager.createTask(...) returns an awaitable RunList. Awaiting it commits the transaction, then waits until all created runs reach the FINISHED state and returns the created (run, job) pairs:

created = await workflow.add.createTask(x=1.5)
for run, job in created:
    print(run.id, job.id)

TaskManager.select(...) also returns an awaitable selection. Awaiting a selection polls until at least one run matches the supplied BlackDynamite constraints:

finished = await workflow.mult.select("state = FINISHED")

Registering actions

Use actions for event-driven workflows. An action registered with add_action(...) is triggered each time its event condition is met. Motoko polls the condition and calls the action function when the condition fires:

workflow.add_action(event_name, task="__all__", event=..., f=...)
  • event_name: Human-readable name stored in workflow logs.

  • task: Task manager to watch. Use "__all__" to evaluate the event against every task manager.

  • event: Condition that decides whether the action fires. It may be a BlackDynamite constraint string, a list of constraint strings, a run, job function, a workflow, task_manager function, or a no-argument function.

  • f: Callback to run when the event fires. If the event returns a run selection, Motoko passes it as runs=...; otherwise the callback receives workflow=... and the runtime parameters.

Example actions:

@event
async def spawn_init_tasks(workflow, **kwargs):
    await workflow.mult.createTask(x=kwargs["inputs"])


@event
async def spawn_add_tasks(runs=None, workflow=None, **kwargs):
    for run, job in runs:
        created = await workflow.add.createTask(x=run.y)
        run.state = "FORWARDED"
        run.dependencies = [f"add.{r.id}" for r, j in created]

Constraint-based event:

workflow.add_action(
    "mult_finished",
    task="mult",
    event=["runs.id < 3", "state = FINISHED"],
    f=spawn_add_tasks,
)

Python condition event:

def ready_for_norm(workflow, task_manager):
    if len(workflow.mult.select([])) != 2:
        return False
    if workflow.mult.select(["state != FORWARDED"]):
        return False
    return True


workflow.add_action("need_norm", event=ready_for_norm, f=spawn_norm_tasks)

Workflow API summary

  • Workflow(filename): Load motoko.yaml, create task manager objects, and record workflow paths.

  • workflow.create(validated=False): Reset .wf/ and initialize all task-manager BlackDynamite studies.

  • workflow.start_launcher_daemons(args=None): Start BlackDynamite launcher daemons for all or selected task managers.

  • workflow.add_action(...): Register an event condition and callback.

  • workflow.add_error_handler(event="state = FAILED", f=...): Register a fail-fast action for failed runs.

  • workflow.execute(**params): Run the orchestrator and event polling loop.

  • workflow.get_runs(["add.1", "norm.3"]): Resolve dependency references into persistent run objects grouped by task manager.

  • workflow.vars: Persistent workflow variable namespace backed by .wf/wf.db.

  • workflow.<task_manager>: Attribute access to task managers, for example workflow.mult.

Task manager API summary

  • TaskManager.createTask(run_params=None, **job_params): Create one or more BlackDynamite jobs/runs. job_params are expanded through the study’s BlackDynamite job schema and default job_space. run_params are stored on each run. Returns an awaitable RunList of (run, job) pairs.

  • await TaskManager.createTask(...): Wait until all created runs reach FINISHED, then return the created (run, job) pairs.

  • TaskManager.select(constraints=None): Return a lazy TaskSelection. Constraints use BlackDynamite syntax, such as "state = FINISHED" or ["runs.id < 3", "state = FINISHED"]. When workflow.run_name is set, Motoko automatically adds a matching run_name constraint.

  • await TaskManager.select(...): Poll until the selection becomes non-empty.

  • TaskSelection.all(...): Build an awaitable condition that waits until all selected runs satisfy one of the supplied constraint sets.