Python Orchestrator API
The orchestrator module usually exposes two functions:
from motoko.workflow import event
def populate_arg_parser(parser):
parser.add_argument("--inputs", "-i", type=float, required=True, nargs=2)
async def main(workflow, **params):
# orchestrate the workflow here
populate_arg_parser(parser) is optional but useful for workflow-specific CLI
arguments. Motoko calls it when launching the orchestrator function from the
command line.
main(workflow, **params) registers routines and actions. It should be async.
It normally ends by setting workflow.finished = True when some condition is
met, or by otherwise terminating the workflow.
Asynchronous routines
Motoko orchestrators are asynchronous. This allows an orchestrator to express a
sequence of dependent routines directly in main.
Use this pattern when the workflow follows a dependency graph that can be written as a sequence of awaited task submissions and selections:
async def run_mult(workflow, inputs):
return await workflow.mult.createTask(x=inputs)
async def run_add(workflow, mult_runs):
created = []
for run, job in mult_runs:
created.extend(await workflow.add.createTask(x=run.y))
return created
async def main(workflow, **params):
mult_runs = await run_mult(workflow, params["inputs"])
add_runs = await run_add(workflow, mult_runs)
await workflow.norm.createTask(
mult_ids=[run.id for run, job in add_runs],
)
TaskManager.createTask(...) returns an awaitable RunList. Awaiting it
commits the transaction, then waits until all created runs reach the
FINISHED state and returns the created (run, job) pairs:
created = await workflow.add.createTask(x=1.5)
for run, job in created:
print(run.id, job.id)
TaskManager.select(...) also returns an awaitable selection. Awaiting a
selection polls until at least one run matches the supplied BlackDynamite
constraints:
finished = await workflow.mult.select("state = FINISHED")
Registering actions
Use actions for event-driven workflows. An action registered with
add_action(...) is triggered each time its event condition is met. Motoko
polls the condition and calls the action function when the condition fires:
workflow.add_action(event_name, task="__all__", event=..., f=...)
event_name: Human-readable name stored in workflow logs.task: Task manager to watch. Use"__all__"to evaluate the event against every task manager.event: Condition that decides whether the action fires. It may be a BlackDynamite constraint string, a list of constraint strings, arun, jobfunction, aworkflow, task_managerfunction, or a no-argument function.f: Callback to run when the event fires. If the event returns a run selection, Motoko passes it asruns=...; otherwise the callback receivesworkflow=...and the runtime parameters.
Example actions:
@event
async def spawn_init_tasks(workflow, **kwargs):
await workflow.mult.createTask(x=kwargs["inputs"])
@event
async def spawn_add_tasks(runs=None, workflow=None, **kwargs):
for run, job in runs:
created = await workflow.add.createTask(x=run.y)
run.state = "FORWARDED"
run.dependencies = [f"add.{r.id}" for r, j in created]
Constraint-based event:
workflow.add_action(
"mult_finished",
task="mult",
event=["runs.id < 3", "state = FINISHED"],
f=spawn_add_tasks,
)
Python condition event:
def ready_for_norm(workflow, task_manager):
if len(workflow.mult.select([])) != 2:
return False
if workflow.mult.select(["state != FORWARDED"]):
return False
return True
workflow.add_action("need_norm", event=ready_for_norm, f=spawn_norm_tasks)
Workflow API summary
Workflow(filename): Loadmotoko.yaml, create task manager objects, and record workflow paths.workflow.create(validated=False): Reset.wf/and initialize all task-manager BlackDynamite studies.workflow.start_launcher_daemons(args=None): Start BlackDynamite launcher daemons for all or selected task managers.workflow.add_action(...): Register an event condition and callback.workflow.add_error_handler(event="state = FAILED", f=...): Register a fail-fast action for failed runs.workflow.execute(**params): Run the orchestrator and event polling loop.workflow.get_runs(["add.1", "norm.3"]): Resolve dependency references into persistent run objects grouped by task manager.workflow.vars: Persistent workflow variable namespace backed by.wf/wf.db.workflow.<task_manager>: Attribute access to task managers, for exampleworkflow.mult.
Task manager API summary
TaskManager.createTask(run_params=None, **job_params): Create one or more BlackDynamite jobs/runs.job_paramsare expanded through the study’s BlackDynamite job schema and defaultjob_space.run_paramsare stored on each run. Returns an awaitableRunListof(run, job)pairs.await TaskManager.createTask(...): Wait until all created runs reachFINISHED, then return the created(run, job)pairs.TaskManager.select(constraints=None): Return a lazyTaskSelection. Constraints use BlackDynamite syntax, such as"state = FINISHED"or["runs.id < 3", "state = FINISHED"]. Whenworkflow.run_nameis set, Motoko automatically adds a matchingrun_nameconstraint.await TaskManager.select(...): Poll until the selection becomes non-empty.TaskSelection.all(...): Build an awaitable condition that waits until all selected runs satisfy one of the supplied constraint sets.