motoko.workflow
Attributes
Classes
Workflow-level Python values stored in |
|
Coordinates a Motoko workflow described by a |
Functions
|
Wrap an async workflow action with lightweight firing diagnostics. |
|
Module Contents
- motoko.workflow.EventCallback
- motoko.workflow.EventCondition
- motoko.workflow.ActionRegistry
- motoko.workflow.event(func: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]) collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]
Wrap an async workflow action with lightweight firing diagnostics.
Orchestrator functions often register small async callbacks with
Workflow.add_action(). Decorating those callbacks with@eventguaranties that transaction mechanism will be ensured, by calling the original coroutine in case of transaction collision. Also it logs which action was fired.
- motoko.workflow.no_action(*args: Any, **kwargs: Any) None
- class motoko.workflow.WorkflowVars(db_fname: str)
Workflow-level Python values stored in
.wf/wf.db.Workflow.varsexposes an attribute-style namespace backed by a local ZODB file. Assigningworkflow.vars.foo = valuestoresfooin the workflow directory, so later event handlers or orchestrator restarts can read it back asworkflow.vars.foo. The storage is intentionally separate from BlackDynamite run/job data; it is intended for coordination of the workflow state, not for task input or output.- db_fname
- db
- conn
- root
- class motoko.workflow.Workflow(filename: str)
Coordinates a Motoko workflow described by a
motoko.yamlfile.A workflow is a collection of BlackDynamite studies, handled by task managers in Motoko. Each study has its own directory,
bd.yamlconfiguration file, ZEO database, launcher daemon, and run/job table. TheWorkflowobject ties those studies together by loading an orchestrator Python function, registering event handlers, polling task state, and creating downstream tasks when event conditions become true.The input file is a YAML file that looks as follows:
task_managers: prepare: solve: host: zeo:///path/to/remote-or-local-study analyze: aliases: post: analyze orchestrator: orchestrator.main generator: bash bash_options: - --some-launcher-option
task_managersis required. Its keys are both task-manager names and directory names relative to the directory containingmotoko.yaml. Each directory must contain the BlackDynamite files for that sub-study, most importantlybd.yaml,launch.sh, anddoIt.py. The value may be empty, or it may contain Motoko-specific settings. Currentlyhostis read bymotoko.task_manager.TaskManager; when absent, Motoko uses a local Unix-socket ZEO URL based on the study directory.orchestratoris required and usesmodule.functionsyntax. The module is loaded from a.pyfile in the same folder wheremotoko.yamlis located. The function should be async and acceptworkflowplus any CLI/runtime parameters as**params. It normally callsadd_action()to define event-driven transitions.aliasesis optional. It maps additional attribute names to task manager names so orchestrators can useworkflow.postas a readable alias forworkflow.analyze. This is useful, e.g., when replacing one task with another within the same orchestrator.generatorand*_optionsare optional launcher settings depending on the used job submission manager (bash, slurm, …).During execution,
run_namescopes tasks and selections to one workflow run. CLI execution sets it frommotoko orchestrator start --run_name; Python callers must setworkflow.run_namebefore creating tasks.- finished = False
- task_managers: dict[str, motoko.task_manager.TaskManager]
- aliases: dict[str, str]
- orchestrator_script: str
- orchestrator_function: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]] | None = None
- actions: ActionRegistry
- run_name: str | None = None
- property vars: WorkflowVars
Return the
WorkflowVarsobject.The backing ZODB database is created lazily under
.wf/wf.dbin the workflow directory. Use it to store global values that are not altered during workflow execution.
- create(validated: bool | None = None) None
Reset Motoko/BlackDynamite runtime state and create sub-studies.
This removes the workflow
.wfdirectory, recreates it, then asks BlackDynamite to initialize every task-manager study.
- start_launcher_daemons(args: argparse.Namespace | None = None) None
Start BlackDynamite launcher daemons for task managers.
The launcher daemons check whether new runs have been created and submit or execute them through BlackDynamite using some job management scheme (SLURM, PBS, bash, etc.).
- get_orchestrator_function() collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]
Load the orchestrator function.
The
orchestratorvalue inmotoko.yamlmust be given in themodule.functionformat. The module is imported from the workflow directory where themotoko.yamlconfiguration file is located.
- add_action(event_name: str, task: str = '__all__', event: EventCondition = None, get_params: collections.abc.Callable[[], Any] | None = None, f: EventCallback | None = None) None
Register an event condition function and action callback function.
- Parameters:
event_name – Name used to identify the registered event.
task – Task-manager name to watch, or
"__all__"for all task managers.event – Event condition to evaluate. See the additional information below.
get_params – Optional function evaluated at firing time and passed to the action as
params=....f – Action callback invoked when the event condition fires.
An
eventmay be one of several forms:a function
event(run, job)evaluated for each selected run;a function
event(workflow, task_manager)evaluated once per task manager;a BlackDynamite constraint string such as
"state = FINISHED";a list of BlackDynamite constraints such as
["runs.id < 3", "state = FINISHED"].
If
taskis a task-manager name, the condition is checked only for that manager. If it is"__all__", the condition is checked for all managers. When the condition returns a boolean value,fis called.When the condition returns a run selection, this selection is passed to
fasruns=...; boolean conditions are called withworkflow=...only.
- add_error_handler(event: EventCondition = 'state = FAILED', f: EventCallback = no_action, **kwargs: Any) None
Register a handler for failed runs.
By default this watches all task managers for
state = FAILED. When such a run is found, the callback functionfis invoked and Motoko raises an exception to terminate the orchestrator.
- is_run_filter(f: Any) bool
- is_workflow_filter(f: Any) bool
- is_no_param_func(f: Any) bool
- async fire_event(event_name: str, task_manager_name: str, f: EventCallback, trigger: bool | collections.abc.Iterable[tuple[BlackDynamite.run_zeo.RunZEO, BlackDynamite.job.JobZEO]], get_params: collections.abc.Callable[[], Any] | Any | None, params: dict[str, Any]) collections.abc.AsyncIterator[Any]
Invoke an action after an event condition fired.
triggeris eitherTruefor boolean/task-manager-level conditions or a BlackDynamite run selection for run-level conditions. The arguments of callback functionfare then deduced from it.
- async check_events(**params: Any) None
- async check_event(event_name: str, task_manager_name: str = '__all__', **params: Any) Any
Evaluate one registered event and fire it where applicable.
- execute(**params: Any) None
Run the orchestrator and the event polling loop together.
The
paramsare forwarded to the configured orchestrator and all fired actions. Execution ends when the orchestrator setsworkflow.finished = Trueor when an exception is raised.
- async loop_check_events(**params: Any) None
Poll registered events until the workflow is marked as finished.
- get_runs(run_list: collections.abc.Iterable[str]) dict[str, list[BlackDynamite.run_zeo.RunZEO]]
Deduce run objects from a list of URIs.
The URIs must use the
<task-manager>.<run-id>format. This helper returns the corresponding BlackDynamite run objects.
- is_empty() bool