Task System
Tasks are the unit of work in Horus. A task binds together its input and output artifacts, runtime, executor, target, and optional interaction transport.
Core Concept
Every task must implement all three abstract methods:
async def _run(self) -> None:
...
def is_complete(self) -> bool:
...
def _reset(self) -> None:
...
Contract
_run(): task-specific execution logic; do not mutatestatushereis_complete(): returnTruewhen all output artifacts are present and valid; used to skip already-complete tasks whenskip_if_complete=True_reset(): clear any subclass-specific state so the task can re-run; do not mutatestatushererun()is the publicfinalentry point and runsTaskMiddlewarekind: stris the registry discriminatorexecutorandruntimemust be compatibletargetdecides where the task is dispatchedinteractioncan carry a task-level runtime prompt transport
Runtime compatibility is validated automatically after model construction. An
invalid executor/runtime pair raises IncompatibleRuntimeError.
Base Task
All tasks inherit from BaseTask:
class BaseTask(AutoRegistry, entry_point="task"):
registry_key: ClassVar[str] = "kind"
kind: str
kind_name: ClassVar[str] = "Task"
kind_description: ClassVar[str] = _("Base task")
id: str
name: str
inputs: list[BaseArtifact] = Field(default_factory=list)
outputs: list[BaseArtifact] = Field(default_factory=list)
executor: BaseExecutor
runtime: BaseRuntime
target: BaseTarget
status: TaskStatus = TaskStatus.IDLE
runs: int = 0
skip_if_complete: bool = True
interaction: BaseInteractionTransport | None = None
@final
async def run(self) -> None:
"""Drives status transitions: RUNNING → COMPLETED | CANCELED | FAILED."""
...
async def sync_status(self) -> TaskStatus:
"""Refresh self.status from the target and return the updated value."""
...
@abstractmethod
async def _run(self) -> None:
"""Task-specific execution logic. Do not set self.status here."""
@abstractmethod
def is_complete(self) -> bool:
"""Return True when output artifacts are present and valid."""
@final
def reset(self) -> None:
"""Reset status to IDLE and delegate to _reset()."""
...
@abstractmethod
def _reset(self) -> None:
"""Subclass-specific reset logic. Do not set self.status here."""
Subclasses must implement _run(), is_complete(), and _reset().
inputs and outputs
Each artifact carries its own id, and that id is
what links tasks together in the workflow DAG: a task whose input artifact id
matches another task's output artifact id depends on that task. See
DAG planning.
Output artifact IDs must be unique across the whole workflow; runtimes that
need a name→artifact mapping (for example to format a shell command or inject
function parameters) build it on the fly keyed by artifact.id.
Kind metadata
Registry-backed classes (tasks, targets, runtimes, executors, workflows) now expose two optional ClassVar fields that provide human-friendly metadata for client UIs and registries:
kind_name: ClassVar[str]— a short, human-readable name for the kind.kind_description: ClassVar[str]— a longer description string. Prefer using a plugin-scoped translator created withmake_translator(aliased as_) for translations; see the SDK i18n guide, e.g.:
from your_plugin.i18n import tr as _
class HorusTask(BaseTask):
kind: str = "horus_task"
kind_name: ClassVar[str] = "Horus Task"
kind_description: ClassVar[str] = _("Basic Horus task")
run() wraps _run() in TaskMiddleware.call_with_middleware(...) and owns
all status transitions.
Built-in Tasks
HorusTask: the standard task implementation for command-style executionFunctionTask: a code-first task that wraps a Python function and pairs it withPythonFunctionRuntimeandPythonFunctionExecutor
HorusTask
HorusTask provides the default task behavior:
- emits task lifecycle events
- validates that declared input artifacts exist before execution
- delegates execution to the configured executor
- treats a task as complete when all declared output artifacts exist
The default HorusTask.target is LocalTarget, so tasks run in-process unless
you provide a different target.
If a task declares no outputs, HorusTask.is_complete() returns False, so the
task always runs unless your workflow adds different logic.
FunctionTask
FunctionTask is the simplest way to build an in-memory workflow in Python:
@FunctionTask.task(wf)
def prepare_data() -> None:
...
The decorator creates a FunctionTask, wraps the function in a
PythonFunctionRuntime, and registers the task in the workflow automatically.
Function parameters are injected by name: task is available directly, and
other parameters are matched against the id of each declared input and output
artifact.
It also defaults interaction to the built-in CLI transport, which makes
interactive code-first tasks easy to author.
See FunctionTask for the full guide and examples.
Registering Custom Tasks
To register task plugins, expose them through:
[project.entry-points."horus.task"]
For more details, refer to the Auto-Registry documentation.