# Python Orchestrator API The orchestrator module usually exposes two functions: ```python from motoko.workflow import event def populate_arg_parser(parser): parser.add_argument("--inputs", "-i", type=float, required=True, nargs=2) async def main(workflow, **params): # orchestrate the workflow here ``` `populate_arg_parser(parser)` is optional but useful for workflow-specific CLI arguments. Motoko calls it when launching the orchestrator function from the command line. `main(workflow, **params)` registers routines and actions. It should be async. It normally ends by setting `workflow.finished = True` when some condition is met, or by otherwise terminating the workflow. ## Asynchronous routines Motoko orchestrators are asynchronous. This allows an orchestrator to express a sequence of dependent routines directly in `main`. Use this pattern when the workflow follows a dependency graph that can be written as a sequence of awaited task submissions and selections: ```python async def run_mult(workflow, inputs): return await workflow.mult.createTask(x=inputs) async def run_add(workflow, mult_runs): created = [] for run, job in mult_runs: created.extend(await workflow.add.createTask(x=run.y)) return created async def main(workflow, **params): mult_runs = await run_mult(workflow, params["inputs"]) add_runs = await run_add(workflow, mult_runs) await workflow.norm.createTask( mult_ids=[run.id for run, job in add_runs], ) ``` `TaskManager.createTask(...)` returns an awaitable `RunList`. Awaiting it commits the transaction, then waits until all created runs reach the `FINISHED` state and returns the created `(run, job)` pairs: ```python created = await workflow.add.createTask(x=1.5) for run, job in created: print(run.id, job.id) ``` `TaskManager.select(...)` also returns an awaitable selection. Awaiting a selection polls until at least one run matches the supplied BlackDynamite constraints: ```python finished = await workflow.mult.select("state = FINISHED") ``` ## Registering actions Use actions for event-driven workflows. An action registered with `add_action(...)` is triggered each time its event condition is met. Motoko polls the condition and calls the action function when the condition fires: ```python workflow.add_action(event_name, task="__all__", event=..., f=...) ``` - `event_name`: Human-readable name stored in workflow logs. - `task`: Task manager to watch. Use `"__all__"` to evaluate the event against every task manager. - `event`: Condition that decides whether the action fires. It may be a BlackDynamite constraint string, a list of constraint strings, a `run, job` function, a `workflow, task_manager` function, or a no-argument function. - `f`: Callback to run when the event fires. If the event returns a run selection, Motoko passes it as `runs=...`; otherwise the callback receives `workflow=...` and the runtime parameters. Example actions: ```python @event async def spawn_init_tasks(workflow, **kwargs): await workflow.mult.createTask(x=kwargs["inputs"]) @event async def spawn_add_tasks(runs=None, workflow=None, **kwargs): for run, job in runs: created = await workflow.add.createTask(x=run.y) run.state = "FORWARDED" run.dependencies = [f"add.{r.id}" for r, j in created] ``` Constraint-based event: ```python workflow.add_action( "mult_finished", task="mult", event=["runs.id < 3", "state = FINISHED"], f=spawn_add_tasks, ) ``` Python condition event: ```python def ready_for_norm(workflow, task_manager): if len(workflow.mult.select([])) != 2: return False if workflow.mult.select(["state != FORWARDED"]): return False return True workflow.add_action("need_norm", event=ready_for_norm, f=spawn_norm_tasks) ``` ## Workflow API summary - `Workflow(filename)`: Load `motoko.yaml`, create task manager objects, and record workflow paths. - `workflow.create(validated=False)`: Reset `.wf/` and initialize all task-manager BlackDynamite studies. - `workflow.start_launcher_daemons(args=None)`: Start BlackDynamite launcher daemons for all or selected task managers. - `workflow.add_action(...)`: Register an event condition and callback. - `workflow.add_error_handler(event="state = FAILED", f=...)`: Register a fail-fast action for failed runs. - `workflow.execute(**params)`: Run the orchestrator and event polling loop. - `workflow.get_runs(["add.1", "norm.3"])`: Resolve dependency references into persistent run objects grouped by task manager. - `workflow.vars`: Persistent workflow variable namespace backed by `.wf/wf.db`. - `workflow.`: Attribute access to task managers, for example `workflow.mult`. ## Task manager API summary - `TaskManager.createTask(run_params=None, **job_params)`: Create one or more BlackDynamite jobs/runs. `job_params` are expanded through the study's BlackDynamite job schema and default `job_space`. `run_params` are stored on each run. Returns an awaitable `RunList` of `(run, job)` pairs. - `await TaskManager.createTask(...)`: Wait until all created runs reach `FINISHED`, then return the created `(run, job)` pairs. - `TaskManager.select(constraints=None)`: Return a lazy `TaskSelection`. Constraints use BlackDynamite syntax, such as `"state = FINISHED"` or `["runs.id < 3", "state = FINISHED"]`. When `workflow.run_name` is set, Motoko automatically adds a matching `run_name` constraint. - `await TaskManager.select(...)`: Poll until the selection becomes non-empty. - `TaskSelection.all(...)`: Build an awaitable condition that waits until all selected runs satisfy one of the supplied constraint sets.