motoko.workflow

Attributes

EventCallback

EventCondition

ActionRegistry

Classes

WorkflowVars

Workflow-level Python values stored in .wf/wf.db.

Workflow

Coordinates a Motoko workflow described by a motoko.yaml file.

Functions

event(→ collections.abc.Callable[Ellipsis, ...)

Wrap an async workflow action with lightweight firing diagnostics.

no_action(→ None)

Module Contents

motoko.workflow.EventCallback
motoko.workflow.EventCondition
motoko.workflow.ActionRegistry
motoko.workflow.event(func: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]) collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]

Wrap an async workflow action with lightweight firing diagnostics.

Orchestrator functions often register small async callbacks with Workflow.add_action(). Decorating those callbacks with @event guaranties that transaction mechanism will be ensured, by calling the original coroutine in case of transaction collision. Also it logs which action was fired.

motoko.workflow.no_action(*args: Any, **kwargs: Any) None
class motoko.workflow.WorkflowVars(db_fname: str)

Workflow-level Python values stored in .wf/wf.db.

Workflow.vars exposes an attribute-style namespace backed by a local ZODB file. Assigning workflow.vars.foo = value stores foo in the workflow directory, so later event handlers or orchestrator restarts can read it back as workflow.vars.foo. The storage is intentionally separate from BlackDynamite run/job data; it is intended for coordination of the workflow state, not for task input or output.

db_fname
db
conn
root
class motoko.workflow.Workflow(filename: str)

Coordinates a Motoko workflow described by a motoko.yaml file.

A workflow is a collection of BlackDynamite studies, handled by task managers in Motoko. Each study has its own directory, bd.yaml configuration file, ZEO database, launcher daemon, and run/job table. The Workflow object ties those studies together by loading an orchestrator Python function, registering event handlers, polling task state, and creating downstream tasks when event conditions become true.

The input file is a YAML file that looks as follows:

task_managers:
  prepare:
  solve:
    host: zeo:///path/to/remote-or-local-study
  analyze:
aliases:
  post: analyze
orchestrator: orchestrator.main
generator: bash
bash_options:
  - --some-launcher-option

task_managers is required. Its keys are both task-manager names and directory names relative to the directory containing motoko.yaml. Each directory must contain the BlackDynamite files for that sub-study, most importantly bd.yaml, launch.sh, and doIt.py. The value may be empty, or it may contain Motoko-specific settings. Currently host is read by motoko.task_manager.TaskManager; when absent, Motoko uses a local Unix-socket ZEO URL based on the study directory.

orchestrator is required and uses module.function syntax. The module is loaded from a .py file in the same folder where motoko.yaml is located. The function should be async and accept workflow plus any CLI/runtime parameters as **params. It normally calls add_action() to define event-driven transitions.

aliases is optional. It maps additional attribute names to task manager names so orchestrators can use workflow.post as a readable alias for workflow.analyze. This is useful, e.g., when replacing one task with another within the same orchestrator.

generator and *_options are optional launcher settings depending on the used job submission manager (bash, slurm, …).

During execution, run_name scopes tasks and selections to one workflow run. CLI execution sets it from motoko orchestrator start --run_name; Python callers must set workflow.run_name before creating tasks.

finished = False
task_managers: dict[str, motoko.task_manager.TaskManager]
aliases: dict[str, str]
orchestrator_script: str
orchestrator_function: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]] | None = None
actions: ActionRegistry
run_name: str | None = None
property vars: WorkflowVars

Return the WorkflowVars object.

The backing ZODB database is created lazily under .wf/wf.db in the workflow directory. Use it to store global values that are not altered during workflow execution.

create(validated: bool | None = None) None

Reset Motoko/BlackDynamite runtime state and create sub-studies.

This removes the workflow .wf directory, recreates it, then asks BlackDynamite to initialize every task-manager study.

start_launcher_daemons(args: argparse.Namespace | None = None) None

Start BlackDynamite launcher daemons for task managers.

The launcher daemons check whether new runs have been created and submit or execute them through BlackDynamite using some job management scheme (SLURM, PBS, bash, etc.).

get_orchestrator_function() collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]

Load the orchestrator function.

The orchestrator value in motoko.yaml must be given in the module.function format. The module is imported from the workflow directory where the motoko.yaml configuration file is located.

add_action(event_name: str, task: str = '__all__', event: EventCondition = None, get_params: collections.abc.Callable[[], Any] | None = None, f: EventCallback | None = None) None

Register an event condition function and action callback function.

Parameters:
  • event_name – Name used to identify the registered event.

  • task – Task-manager name to watch, or "__all__" for all task managers.

  • event – Event condition to evaluate. See the additional information below.

  • get_params – Optional function evaluated at firing time and passed to the action as params=....

  • f – Action callback invoked when the event condition fires.

An event may be one of several forms:

  • a function event(run, job) evaluated for each selected run;

  • a function event(workflow, task_manager) evaluated once per task manager;

  • a BlackDynamite constraint string such as "state = FINISHED";

  • a list of BlackDynamite constraints such as ["runs.id < 3", "state = FINISHED"].

If task is a task-manager name, the condition is checked only for that manager. If it is "__all__", the condition is checked for all managers. When the condition returns a boolean value, f is called.

When the condition returns a run selection, this selection is passed to f as runs=...; boolean conditions are called with workflow=... only.

add_error_handler(event: EventCondition = 'state = FAILED', f: EventCallback = no_action, **kwargs: Any) None

Register a handler for failed runs.

By default this watches all task managers for state = FAILED. When such a run is found, the callback function f is invoked and Motoko raises an exception to terminate the orchestrator.

is_run_filter(f: Any) bool
is_workflow_filter(f: Any) bool
is_no_param_func(f: Any) bool
async fire_event(event_name: str, task_manager_name: str, f: EventCallback, trigger: bool | collections.abc.Iterable[tuple[BlackDynamite.run_zeo.RunZEO, BlackDynamite.job.JobZEO]], get_params: collections.abc.Callable[[], Any] | Any | None, params: dict[str, Any]) collections.abc.AsyncIterator[Any]

Invoke an action after an event condition fired.

trigger is either True for boolean/task-manager-level conditions or a BlackDynamite run selection for run-level conditions. The arguments of callback function f are then deduced from it.

async check_events(**params: Any) None
async check_event(event_name: str, task_manager_name: str = '__all__', **params: Any) Any

Evaluate one registered event and fire it where applicable.

execute(**params: Any) None

Run the orchestrator and the event polling loop together.

The params are forwarded to the configured orchestrator and all fired actions. Execution ends when the orchestrator sets workflow.finished = True or when an exception is raised.

async loop_check_events(**params: Any) None

Poll registered events until the workflow is marked as finished.

get_runs(run_list: collections.abc.Iterable[str]) dict[str, list[BlackDynamite.run_zeo.RunZEO]]

Deduce run objects from a list of URIs.

The URIs must use the <task-manager>.<run-id> format. This helper returns the corresponding BlackDynamite run objects.

is_empty() bool