motoko.workflow =============== .. py:module:: motoko.workflow Attributes ---------- .. autoapisummary:: motoko.workflow.EventCallback motoko.workflow.EventCondition motoko.workflow.ActionRegistry Classes ------- .. autoapisummary:: motoko.workflow.WorkflowVars motoko.workflow.Workflow Functions --------- .. autoapisummary:: motoko.workflow.event motoko.workflow.no_action Module Contents --------------- .. py:data:: EventCallback .. py:data:: EventCondition .. py:data:: ActionRegistry .. py:function:: event(func: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]) -> collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]] Wrap an async workflow action with lightweight firing diagnostics. Orchestrator functions often register small async callbacks with :meth:`Workflow.add_action`. Decorating those callbacks with ``@event`` guaranties that transaction mechanism will be ensured, by calling the original coroutine in case of transaction collision. Also it logs which action was fired. .. py:function:: no_action(*args: Any, **kwargs: Any) -> None .. py:class:: WorkflowVars(db_fname: str) Workflow-level Python values stored in ``.wf/wf.db``. ``Workflow.vars`` exposes an attribute-style namespace backed by a local ZODB file. Assigning ``workflow.vars.foo = value`` stores ``foo`` in the workflow directory, so later event handlers or orchestrator restarts can read it back as ``workflow.vars.foo``. The storage is intentionally separate from BlackDynamite run/job data; it is intended for coordination of the workflow state, not for task input or output. .. py:attribute:: db_fname .. py:attribute:: db .. py:attribute:: conn .. py:attribute:: root .. py:class:: Workflow(filename: str) Coordinates a Motoko workflow described by a ``motoko.yaml`` file. A workflow is a collection of BlackDynamite studies, handled by task managers in Motoko. Each study has its own directory, ``bd.yaml`` configuration file, ZEO database, launcher daemon, and run/job table. The ``Workflow`` object ties those studies together by loading an orchestrator Python function, registering event handlers, polling task state, and creating downstream tasks when event conditions become true. The input file is a YAML file that looks as follows:: task_managers: prepare: solve: host: zeo:///path/to/remote-or-local-study analyze: aliases: post: analyze orchestrator: orchestrator.main generator: bash bash_options: - --some-launcher-option ``task_managers`` is required. Its keys are both task-manager names and directory names relative to the directory containing ``motoko.yaml``. Each directory must contain the BlackDynamite files for that sub-study, most importantly ``bd.yaml``, ``launch.sh``, and ``doIt.py``. The value may be empty, or it may contain Motoko-specific settings. Currently ``host`` is read by :class:`motoko.task_manager.TaskManager`; when absent, Motoko uses a local Unix-socket ZEO URL based on the study directory. ``orchestrator`` is required and uses ``module.function`` syntax. The module is loaded from a ``.py`` file in the same folder where ``motoko.yaml`` is located. The function should be async and accept ``workflow`` plus any CLI/runtime parameters as ``**params``. It normally calls :meth:`add_action` to define event-driven transitions. ``aliases`` is optional. It maps additional attribute names to task manager names so orchestrators can use ``workflow.post`` as a readable alias for ``workflow.analyze``. This is useful, e.g., when replacing one task with another within the same orchestrator. ``generator`` and ``*_options`` are optional launcher settings depending on the used job submission manager (bash, slurm, ...). During execution, ``run_name`` scopes tasks and selections to one workflow run. CLI execution sets it from ``motoko orchestrator start --run_name``; Python callers must set ``workflow.run_name`` before creating tasks. .. py:attribute:: finished :value: False .. py:attribute:: task_managers :type: dict[str, motoko.task_manager.TaskManager] .. py:attribute:: aliases :type: dict[str, str] .. py:attribute:: orchestrator_script :type: str .. py:attribute:: orchestrator_function :type: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]] | None :value: None .. py:attribute:: actions :type: ActionRegistry .. py:attribute:: run_name :type: str | None :value: None .. py:property:: vars :type: WorkflowVars Return the ``WorkflowVars`` object. The backing ZODB database is created lazily under ``.wf/wf.db`` in the workflow directory. Use it to store global values that are not altered during workflow execution. .. py:method:: create(validated: bool | None = None) -> None Reset Motoko/BlackDynamite runtime state and create sub-studies. This removes the workflow ``.wf`` directory, recreates it, then asks BlackDynamite to initialize every task-manager study. .. py:method:: start_launcher_daemons(args: argparse.Namespace | None = None) -> None Start BlackDynamite launcher daemons for task managers. The launcher daemons check whether new runs have been created and submit or execute them through BlackDynamite using some job management scheme (SLURM, PBS, bash, etc.). .. py:method:: get_orchestrator_function() -> collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]] Load the orchestrator function. The ``orchestrator`` value in ``motoko.yaml`` must be given in the ``module.function`` format. The module is imported from the workflow directory where the ``motoko.yaml`` configuration file is located. .. py:method:: add_action(event_name: str, task: str = '__all__', event: EventCondition = None, get_params: collections.abc.Callable[[], Any] | None = None, f: EventCallback | None = None) -> None Register an event condition function and action callback function. :param event_name: Name used to identify the registered event. :param task: Task-manager name to watch, or ``"__all__"`` for all task managers. :param event: Event condition to evaluate. See the additional information below. :param get_params: Optional function evaluated at firing time and passed to the action as ``params=...``. :param f: Action callback invoked when the event condition fires. An ``event`` may be one of several forms: - a function ``event(run, job)`` evaluated for each selected run; - a function ``event(workflow, task_manager)`` evaluated once per task manager; - a BlackDynamite constraint string such as ``"state = FINISHED"``; - a list of BlackDynamite constraints such as ``["runs.id < 3", "state = FINISHED"]``. If ``task`` is a task-manager name, the condition is checked only for that manager. If it is ``"__all__"``, the condition is checked for all managers. When the condition returns a boolean value, ``f`` is called. When the condition returns a run selection, this selection is passed to ``f`` as ``runs=...``; boolean conditions are called with ``workflow=...`` only. .. py:method:: add_error_handler(event: EventCondition = 'state = FAILED', f: EventCallback = no_action, **kwargs: Any) -> None Register a handler for failed runs. By default this watches all task managers for ``state = FAILED``. When such a run is found, the callback function ``f`` is invoked and Motoko raises an exception to terminate the orchestrator. .. py:method:: is_run_filter(f: Any) -> bool .. py:method:: is_workflow_filter(f: Any) -> bool .. py:method:: is_no_param_func(f: Any) -> bool .. py:method:: fire_event(event_name: str, task_manager_name: str, f: EventCallback, trigger: bool | collections.abc.Iterable[tuple[BlackDynamite.run_zeo.RunZEO, BlackDynamite.job.JobZEO]], get_params: collections.abc.Callable[[], Any] | Any | None, params: dict[str, Any]) -> collections.abc.AsyncIterator[Any] :async: Invoke an action after an event condition fired. ``trigger`` is either ``True`` for boolean/task-manager-level conditions or a BlackDynamite run selection for run-level conditions. The arguments of callback function ``f`` are then deduced from it. .. py:method:: check_events(**params: Any) -> None :async: .. py:method:: check_event(event_name: str, task_manager_name: str = '__all__', **params: Any) -> Any :async: Evaluate one registered event and fire it where applicable. .. py:method:: execute(**params: Any) -> None Run the orchestrator and the event polling loop together. The ``params`` are forwarded to the configured orchestrator and all fired actions. Execution ends when the orchestrator sets ``workflow.finished = True`` or when an exception is raised. .. py:method:: loop_check_events(**params: Any) -> None :async: Poll registered events until the workflow is marked as finished. .. py:method:: get_runs(run_list: collections.abc.Iterable[str]) -> dict[str, list[BlackDynamite.run_zeo.RunZEO]] Deduce run objects from a list of URIs. The URIs must use the ``.`` format. This helper returns the corresponding BlackDynamite run objects. .. py:method:: is_empty() -> bool