Deployment Guide¶
Pick the shape that matches your infrastructure. Each recipe shows the minimal install, the invocation, and a state-survival explanation so you know what loses or keeps history across restarts.
The orchestrator surface to keep in mind:
| Concern | What it is | Configurable via |
|---|---|---|
| Schedule | @register(schedule="@hourly") declares when a pipeline fires |
--interval (cron tick window) |
| Run history | last-run timestamps + retry state | --run-log-url |
| Alerts | push events on failure / give-up / recovery | --alerter (repeatable) |
| Metrics | counters, histograms, gauges to a monitoring stack | --metrics |
The defaults are: SQLite at ~/.ematix-flow/run_log.db, no alerters,
NullSink metrics. Operators opt into everything beyond that.
Recipe 1 — Single laptop / dev¶
The lightest possible install. Schedule fires from a cron entry; state lives in a local SQLite file in your home dir.
Install
Invocation (cron, every minute)
State survival
- SQLite at
~/.ematix-flow/run_log.dbsurvives restarts. - Single host, single writer — no concurrency concerns.
Recipe 2 — Single pod / systemd service¶
Same shape as recipe 1 but on a server. Mount a writable volume for the SQLite file so pod restarts don't lose state.
Install
systemd unit (/etc/systemd/system/flow.timer + .service)
# flow.service
[Unit]
Description=ematix-flow run-due tick
[Service]
Type=oneshot
Environment=EMATIX_FLOW_RUN_LOG_URL=sqlite:///var/lib/ematix-flow/run_log.db
Environment=EMATIX_FLOW_ALERTERS=stdout://
User=flow
ExecStart=/usr/local/bin/flow run-due --module my_app.pipelines --interval 60
# flow.timer
[Unit]
Description=Run ematix-flow every minute
[Timer]
OnCalendar=*:0/1
Persistent=true
[Install]
WantedBy=timers.target
State survival
/var/lib/ematix-flow/mounted persistently → survives pod restarts.- Logs flow to journald via stdout/stderr;
flow statusshows current pipeline state.
Recipe 3 — Multi-host with shared Postgres¶
Several hosts run flow run-due against the same orchestrator state.
SQLite can't be safely shared across hosts; use Postgres.
Install
Connection setup (one-time, on the Postgres side)
CREATE DATABASE orchestrator;
CREATE USER flow WITH PASSWORD '...';
GRANT CONNECT ON DATABASE orchestrator TO flow;
GRANT USAGE, CREATE ON SCHEMA public TO flow;
PostgresRunLog auto-creates the two tables (run_log,
attempt_state) on first connect. You can pre-create them and pass
create_tables=False if your role lacks DDL privilege.
Invocation
EMATIX_FLOW_RUN_LOG_URL=postgresql://flow:...@db.internal/orchestrator \
EMATIX_FLOW_ALERTERS=slack://hooks.slack.com/services/X/Y/Z \
EMATIX_FLOW_METRICS=prometheus://:9090 \
flow run-due --module my_app.pipelines --interval 60
State survival
- Postgres is the source of truth; any host can join or leave the fleet without losing history.
INSERT ... ON CONFLICT DO UPDATEupserts are atomic, so two cron ticks racing on the same pipeline name produce a consistent outcome.
Recipe 4 — Kubernetes CronJob¶
Each tick is a fresh pod. Run-log must live outside the pod (Postgres, S3, GCS, Azure Blob, or NFS-mounted SQLite).
Install (Dockerfile)
FROM python:3.12-slim
RUN pip install --no-cache-dir "ematix-flow[runlog-postgres,metrics-prometheus]"
COPY my_app/ /app/my_app/
WORKDIR /app
CMD ["flow", "run-due", "--module", "my_app.pipelines"]
CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: flow
spec:
schedule: "* * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: flow
image: my-registry/flow:latest
env:
- name: EMATIX_FLOW_RUN_LOG_URL
valueFrom:
secretKeyRef:
name: flow-secrets
key: pg-url
- name: EMATIX_FLOW_ALERTERS
value: "slack://hooks.slack.com/services/X/Y/Z"
- name: EMATIX_FLOW_METRICS
value: "otlp://otel-collector.observability:4317"
args:
- "run-due"
- "--module=my_app.pipelines"
- "--interval=60"
restartPolicy: Never
State survival
- Postgres holds run-log.
- OTel collector aggregates metrics across all replicas + every tick.
- Slack receives the same events from any pod that runs.
Recipe 5 — AWS Lambda (single-region)¶
Lambda has a read-only filesystem except /tmp, and /tmp doesn't
persist across invocations. Use S3 for run history.
Install (Lambda layer)
Handler
# handler.py
import os
import sys
from ematix_flow import cli
def lambda_handler(event, context):
sys.argv = [
"flow",
"run-due",
"--module", "my_app.pipelines",
"--interval", "60",
"--run-log-url", os.environ["EMATIX_FLOW_RUN_LOG_URL"],
"--alerter", os.environ["EMATIX_FLOW_ALERTERS"],
]
return cli.main()
Environment
EMATIX_FLOW_RUN_LOG_URL=s3://my-bucket/flow-state/prod
EMATIX_FLOW_ALERTERS=slack://hooks.slack.com/services/X/Y/Z
State survival
- S3 is the orchestrator's memory between invocations.
- IAM role on the Lambda needs
s3:GetObject,s3:PutObject,s3:DeleteObject, ands3:ListBucketon the prefix. - Each pipeline name is one key under
{prefix}/run_log/and{prefix}/attempt_state/— easy to inspect withaws s3 ls.
Recipe 6 — GCP Cloud Run / Cloud Scheduler¶
Symmetrical to the Lambda recipe but on GCP.
Install
Cloud Run service entry
# main.py
import os, sys
from flask import Flask
from ematix_flow import cli
app = Flask(__name__)
@app.route("/run-due", methods=["POST"])
def tick():
sys.argv = [
"flow", "run-due",
"--module", "my_app.pipelines",
"--run-log-url", os.environ["EMATIX_FLOW_RUN_LOG_URL"],
"--metrics", "otlp://otel-collector:4317",
]
rc = cli.main()
return {"rc": rc}, 200 if rc == 0 else 500
Cloud Scheduler posts to /run-due every minute.
State survival
- GCS holds run-log. ADC credentials on the Cloud Run service handle auth.
- For multi-region, point all regions at the same bucket; the orchestrator state is consistent.
Recipe 7 — Azure Functions / Container Apps¶
The azure:// URL scheme synthesises the blob endpoint
(https://myaccount.blob.core.windows.net). For non-default
endpoints (Azure Stack, Azurite emulator), construct
AzureBlobRunLog directly with account_url= from Python instead
of using the URL form.
Recipe 8 — central scheduler, fan-out to workers¶
Recipes 1–7 all use the cron-tick model: an external scheduler
wakes flow run-due once per interval, fires whatever's due, and
exits. This recipe uses flow scheduler instead — one long-running
controller process holds a leader lease, walks the DAG every
--poll-interval seconds, and dispatches each eligible pipeline to a
disposable worker via an Executor. The controller never runs
pipeline code itself.
When to pick this over recipes 1–7:
- You want sub-minute reactions (cron's minimum is one minute).
- You want each pipeline isolated in its own pod / container instead of crammed into one cron host.
- You want one observable controller process (alerts, metrics, logs in one place) instead of N-per-tick cron processes.
Run multiple scheduler replicas for HA. Leader election uses the same
RunLog.claim machinery as pipelines — there's no extra table, no
separate consensus service. At most one replica walks the DAG per
tick; the others log "leader is X" and sleep.
Install (Dockerfile)
FROM python:3.12-slim
RUN pip install --no-cache-dir \
"ematix-flow[runlog-postgres,metrics-prometheus,executor-k8s]"
COPY my_app/ /app/my_app/
WORKDIR /app
Scheduler Deployment (k8s controller)
apiVersion: apps/v1
kind: Deployment
metadata:
name: flow-scheduler
spec:
replicas: 2 # for HA; leader lease serialises them
selector: { matchLabels: { app: flow-scheduler } }
template:
metadata: { labels: { app: flow-scheduler } }
spec:
serviceAccountName: flow-scheduler # needs k8s Job create + delete
containers:
- name: scheduler
image: my-registry/flow:latest
args:
- "flow"
- "scheduler"
- "--module=my_app.pipelines"
- "--executor=k8s://flow?image=my-registry/flow:latest&service-account=flow-worker"
- "--poll-interval=10"
- "--lease-seconds=300"
env:
- name: EMATIX_FLOW_RUN_LOG_URL
valueFrom: { secretKeyRef: { name: flow-secrets, key: pg-url } }
- name: EMATIX_FLOW_ALERTERS
value: "slack://hooks.slack.com/services/X/Y/Z"
- name: EMATIX_FLOW_METRICS
value: "otlp://otel-collector.observability:4317"
RBAC for the controller (flow-scheduler SA needs to create
Jobs in its namespace):
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata: { name: flow-job-runner, namespace: flow }
rules:
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create", "get", "list", "watch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata: { name: flow-scheduler, namespace: flow }
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: flow-job-runner
subjects:
- { kind: ServiceAccount, name: flow-scheduler, namespace: flow }
The worker Jobs (created by the controller) run as
flow-worker, which only needs network access to the RunLog —
no k8s API perms.
Executor URL alternatives
| URL | Spawns | Use when |
|---|---|---|
subprocess:// |
local flow run subprocess |
Single host, no orchestrator. |
subprocess+python:// |
python -m ematix_flow.cli run via sys.executable |
Dev / CI; no wheel on PATH. |
k8s://<ns>?image=<img>&service-account=<sa> |
batch/v1 Job per pipeline |
Multi-tenant cluster; pipeline isolation. |
lambda://<fn>?qualifier=<alias> |
Async Invoke(InvocationType="Event") |
AWS-native dispatch; lease-expiry handles death since Lambda has no cancel. |
State survival
- Postgres (or any durable RunLog) is the only shared state. The scheduler holds no in-memory queue.
- Killing a scheduler pod is safe — the leader lease expires after
--lease-seconds, the surviving replica takes over on the next tick. Any workerJobalready in flight keeps running and writes outcome back to the RunLog independently. - A worker that dies mid-pipeline stops heartbeating; the next scheduler tick sees the expired lease and the row becomes claimable again. No separate "release" step.
Migrating from flow run-due — run both side-by-side against the
same RunLog. The per-pipeline claim CAS guarantees no
double-dispatch, so you can leave cron up while you trust the new
daemon, then remove the cron entry once you're satisfied.
Observability cheat sheet¶
Alerters¶
| URL | Sends to | Dep |
|---|---|---|
stdout:// |
stderr | stdlib |
slack://hooks.slack.com/services/X/Y/Z |
Slack channel | stdlib (urllib) |
https://hooks.slack.com/services/X/Y/Z |
Slack channel (passthrough) | stdlib |
Repeat --alerter to send to multiple destinations. A failing alerter
doesn't take the orchestrator down — exceptions are logged-and-swallowed
so the next alerter in the chain still fires.
Metrics¶
| URL | Sink | Dep |
|---|---|---|
null:// |
no-op (default) | stdlib |
stdout:// |
one line per event to stderr | stdlib |
memory:// |
in-process dicts (tests) | stdlib |
prometheus://:9090 |
HTTP /metrics server on port 9090 |
prometheus_client |
prometheus:// |
metrics recorded only; scrape via the API | prometheus_client |
otlp://collector:4317 |
OTLP gRPC | opentelemetry-sdk + -exporter-otlp |
otlp+http://collector:4318 |
OTLP HTTP | same |
Three orchestrator-level metrics every operator gets:
pipeline_runs_total{pipeline, outcome} # outcome ∈ {success, failure, skipped}
pipeline_duration_seconds{pipeline} # histogram
pipeline_retry_attempt{pipeline} # gauge (0 when idle)
Run-log backends¶
| URL prefix | Backend | Install extra |
|---|---|---|
sqlite:///path or bare path |
SQLite (default) | (none — stdlib) |
memory:// |
InMemoryRunLog | (none) |
postgres://... / postgresql://... |
PostgresRunLog | runlog-postgres |
mysql://... / mariadb://... |
MySQLRunLog | runlog-mysql |
duckdb:///path or duckdb://:memory: |
DuckDBRunLog | runlog-duckdb |
s3://bucket/prefix |
S3RunLog (incl. MinIO, R2 via custom client) | runlog-s3 |
gs://bucket/prefix |
GcsRunLog | runlog-gcs |
azure://account/container/prefix |
AzureBlobRunLog | runlog-azure |
Graceful degradation¶
If the configured run-log location can't be opened (read-only FS,
network unreachable, bad credentials), flow run-due prints a stderr
warning and continues without persistence. Pipelines still fire
according to their declared schedule; only the durable history
side-effect is skipped.
To silence the warning intentionally, pass --no-run-log or set
EMATIX_FLOW_RUN_LOG_URL= to empty.
What you don't need to worry about¶
- DDL on Postgres / MySQL: tables auto-create on first connect via
IF NOT EXISTS. Schema auto-creates on Postgres (usecreate_tables=Falsefrom Python if your role lacks DDL privilege). - Cross-tick state coherence: every RunLog backend is upsert-based, so two ticks racing on the same pipeline name produce a single final state record.
- Retry semantics:
flow run-duehonorsretry=policy from@register(...)automatically — backoff windows, gave-up gating, and recovery events all fire from the CLI without further wiring. - Pipeline data path: Rust drivers (
tokio-postgres,mysql_async, deltalake-rs) handle bulk row movement; Python's only on the orchestrator-state and dev-helper paths. The RunLog backend you pick doesn't bottleneck pipeline throughput.