Workflow System
Workflows orchestrate an ordered set of tasks.
Horus currently executes tasks in definition order. It does not perform dependency resolution automatically, so task ordering is the workflow author's responsibility.
Core Concept
Every workflow implements:
@classmethod
def from_yaml(cls, path: str | Path) -> Self: ...
async def run(self) -> None: ...
def reset(self) -> None: ...
Contract
from_yaml()loads a workflow definitionrun()is asynchronousreset()clears task state so the workflow can be re-runkind: stris the registry discriminator
Base Workflow
All workflows inherit from BaseWorkflow:
class BaseWorkflow(AutoRegistry, entry_point="workflow"):
registry_key: ClassVar[str] = "kind"
kind: str
name: str
tasks: dict[str, BaseTask] = Field(default_factory=dict)
@classmethod
@abstractmethod
def from_yaml(cls, path: str | Path) -> Self:
pass
@abstractmethod
async def run(self) -> None:
pass
@abstractmethod
def reset(self) -> None:
pass
Built-in Workflow
HorusWorkflow: runs tasks in definition order and skips tasks whose outputs already exist whentask.skip_if_completeisTrue
Example
import asyncio
from horus_builtin.workflow.horus_workflow import HorusWorkflow
wf = HorusWorkflow(name="example")
asyncio.run(wf.run())
Task IDs
Each task receives its task_id from the key used in the workflow's tasks
mapping. This keeps task IDs aligned with workflow registration keys.
Registering Custom Workflows
To register workflow plugins, expose them through:
[project.entry-points."horus.workflow"]
For more details, refer to the Auto-Registry documentation.