Control Flow¶
MolExp's workflow engine handles control flow through the shape of the DAG, not through special task types. There is no IfTask, ForLoopTask, or MapTask — parallelism, fan-out, and fan-in are expressed by how you wire depends_on edges, with two opt-in decorators (parallel_map, join) when you need to fan out over a runtime-produced list.
Automatic Parallelism¶
Tasks at the same topological level run in parallel automatically. You don't mark anything as "parallel"; you just make sure they share the same set of upstream dependencies.
from molexp.workflow import workflow, TaskContext
wf = workflow(name="pipeline")
@wf.task
async def fetch(ctx: TaskContext) -> dict:
return {"data": load()}
@wf.task(depends_on=["fetch"])
async def parse(ctx: TaskContext) -> dict: ...
@wf.task(depends_on=["fetch"])
async def validate(ctx: TaskContext) -> dict: ...
@wf.task(depends_on=["parse", "validate"])
async def merge(ctx: TaskContext) -> dict: ...
parse and validate run concurrently once fetch finishes. merge waits for both. This is the idiomatic way to express "run these in parallel, then reduce".
Conditional Execution¶
Use a plain if inside a task. If a branch has no work, return a sentinel / None:
@wf.task(depends_on=["fetch"])
async def maybe_clean(ctx: TaskContext) -> dict:
if ctx.config.get("skip_cleaning", False):
return ctx.inputs
return clean(ctx.inputs)
For larger branch-specific pipelines, keep each branch as its own task and wire depends_on accordingly; skipped branches still schedule but can short-circuit cheaply from ctx.inputs.
Loops¶
Sequential repetition belongs inside a task (Python for / while is fine):
@wf.task
async def iterate(ctx: TaskContext) -> list[float]:
xs = ctx.inputs
for _ in range(ctx.config.get("iters", 10)):
xs = [x * 1.01 for x in xs]
return xs
Fan-Out Over a Runtime List¶
Use the parallel_map decorator when you need to fan out over a list produced by an upstream task:
from molexp.workflow import workflow, parallel_map, join, TaskContext
wf = workflow(name="fan-out")
@wf.task
async def scatter(ctx: TaskContext) -> list[int]:
return [1, 2, 3, 4]
@parallel_map(wf, fan_out_over="scatter")
async def process(ctx: TaskContext) -> int:
return ctx.inputs ** 2
@join(wf, depends_on=["process"], reducer="sum")
async def reduce(ctx: TaskContext) -> int:
return ctx.inputs
The decorators set per-task metadata (_parallel_map_config, _join_config) the runtime reads during scheduling. Use them when the fan-out count is only known at runtime; prefer plain depends_on when it's known at authoring time.
Pattern Selection¶
| Want | Use |
|---|---|
| Same-time concurrent tasks | same-level depends_on — no extra config |
| Conditional branch | plain Python if inside the task |
| Fixed-size fan-out | N tasks authored at build time with identical depends_on |
| Runtime-sized fan-out | @parallel_map(wf, fan_out_over=...) + @join(wf, reducer=...) |
| Long-running streaming processing | Actor (see task-and-actor.md) |
Explicit IR-level control-flow tasks (IfTask, ForTask, etc.) are not part of the current API and are not planned in the short term — the DAG shape + decorators cover the cases we've actually needed.
Runnable Example¶
examples/workflow/control_flow.py runs a diamond, a conditional branch driven by ctx.config, and a parallel_map + join fan-out.