ematix-flow — Implementation Plan (v0.1)¶
Companion to docs/PRD.md. Phases are ordered for fastest path to a working
end-to-end happy path, then breadth of strategies, then production polish.
Each phase ends in a green cargo test + pytest run and a working demo on
real Postgres.
Status as of 2026-04-30¶
v0.1 scope (Phases 0–14): shipped. All four strategies, watermarks, delete handling, scheduling, CLI, and test infra are in place.
Post-v0.1 (Phases 15–28): also shipped. ML feature-store path, ergonomics decorator overhaul, normalization, transforms, DataFrame interop (polars/pandas/pyspark). See:
docs/ERGONOMICS_PLAN.md— Phases 21–25docs/NORMALIZATION_TRANSFORMS_PLAN.md— Phases 26–28docs/ML_FEATURE_STORE_PLAN.md— Phases 15–20
Test counts (current): 110 Rust unit + 239 default Python + 181 integration Python (testcontainers + Docker) + 4 opt-in Spark E2E. All green on macOS aarch64; CI runs the same matrix on Linux x86_64.
Pending v0.1 release work: - Wheel-build CI + PyPI trusted publishing (Phase 14 task list). - mkdocs site generation.
Phase status markers below: ✅ shipped, 🚧 partial, ⬜ pending.
Repo layout (target)¶
data-eng-framework/ (rename to ematix-flow before publish)
├── Cargo.toml workspace root
├── crates/
│ ├── ematix-flow-core/ pure-Rust library (Postgres engine)
│ │ ├── Cargo.toml
│ │ └── src/
│ │ ├── lib.rs
│ │ ├── types.rs type catalogue (Postgres dialect)
│ │ ├── ddl.rs CREATE TABLE planner
│ │ ├── hash.rs row hash function
│ │ ├── pg.rs tokio-postgres pool + helpers
│ │ ├── source.rs Source resolution + same-DB detection
│ │ ├── stage.rs COPY-based staging
│ │ ├── meta.rs ematix_flow.{watermarks,run_history}
│ │ ├── strategy/
│ │ │ ├── mod.rs
│ │ │ ├── append.rs
│ │ │ ├── truncate.rs
│ │ │ ├── merge.rs SCD1
│ │ │ └── scd2.rs
│ │ └── plan.rs PipelineSpec → ExecutionPlan
│ └── ematix-flow-py/ PyO3 bindings crate
│ ├── Cargo.toml
│ └── src/lib.rs wraps core types for Python
├── python/
│ └── ematix_flow/ pure-Python package (depends on built ext)
│ ├── __init__.py
│ ├── types.py Column, Integer, String, ...
│ ├── table.py ManagedTable
│ ├── source.py Source.postgres_query, ...
│ ├── strategy.py AppendOnly, MergeUpsert, SCD2 wrappers
│ ├── pipeline.py Pipeline, pipeline.sync
│ ├── cli.py `flow` entry point
│ └── _core.pyi type stubs for the Rust extension
├── pyproject.toml maturin build config
├── tests/
│ ├── rust/ in-crate `cargo test`
│ └── python/ pytest, integration via testcontainers
└── docs/
├── PRD.md
├── IMPLEMENTATION_PLAN.md
└── (mkdocs site later)
Phase 0 — Repo & build scaffolding (≈1 day) ✅¶
Goal: a workspace that builds, lints, and ships an empty wheel.
- Convert
Cargo.tomlto a workspace; move existing stub intocrates/ematix-flow-core. - Add
crates/ematix-flow-pywithpyo3 = { version = "0.22", features = ["extension-module"] }andcrate-type = ["cdylib"]. - Add
pyproject.tomlwith[build-system] requires = ["maturin>=1.5"], configured to find theematix-flow-pycrate and emit a wheel exposingematix_flow._core. - Create the empty
python/ematix_flow/package with a stub__init__.pythat re-exports nothing yet. -
maturin developsucceeds;python -c "import ematix_flow"succeeds. - CI: GitHub Actions workflow running
cargo fmt --check,cargo clippy -- -D warnings,cargo test,pytest, on macOS-latest and ubuntu-latest. - Decide: rename repo / package directory from
data-eng-frameworktoematix-flownow or after v0.1? Recommend now to avoid rewriting paths and CI configs later.
Exit criteria: green CI on a no-op PR.
Phase 1 — Rust ↔ Python bridge smoke test (≈0.5 day) ✅¶
- Define a minimal
PipelineSpecstruct inematix-flow-corewithserdederives. - Expose a
parse_spec(json: &str) -> PyResult<PipelineSpec>to Python. - Python builds spec from
Pipeline(...)args, ships JSON to Rust, Rust echoes a normalized version back. No DB yet.
Exit: round-trip test passing.
Phase 2 — Type system + ManagedTable (≈1–2 days) ✅¶
- Rust
types.rs: enum of supported types, each withto_postgres_sql(). - Python
types.py:Column,Integer,BigInt,SmallInt,String,Text,Boolean,Float,Double,Numeric(precision, scale),Date,Timestamp,TimestampTZ,JSON,JSONB,UUID,Bytes. - Python
table.py:ManagedTablemetaclass that: - Collects
Columnattributes in declaration order. - Validates
__tablename__and__schema__exist. - Computes a stable
schema_fingerprint(hash of name + ordered columns + types). - Exposes
_to_spec()returning a JSON-serializable dict. - Tests: declare a
CustomerDimclass, assert spec round-trips through Rust unchanged, assert primary-key inference works.
Exit: a ManagedTable declaration can be lowered to a Rust TableSpec.
Phase 3 — Postgres adapter + connection management (≈1–2 days) ✅¶
- Rust
pg.rs:tokio-postgres+deadpool-postgrespool. - Connection-string parsing with the
postgrescrate's URL parser. -
same_database(a, b)returning true iff(host, port, dbname, user)match (port-normalized, default 5432). - Async runtime:
tokio::runtime::Runtimeowned by the core, all async work driven throughruntime.block_on()from PyO3 entry points. - Test against a
testcontainers::Postgresinstance: connect, runSELECT 1, run a transaction.
Exit: ematix_flow._core.connect(url) works from Python.
Phase 4 — DDL planner (≈1–2 days) ✅¶
- Rust
ddl.rs::create_table_sql(table_spec) -> String. - Reflection:
read_existing_columns(conn, schema, table)returning a vec of(name, postgres_type, nullable, is_pk). - Drift comparator: declared vs reflected → enum
{ Match, Missing, Drift(Vec<Difference>) }. - Python wires
target.ensure(conn, on_drift="error")into a pre-flight step run before any strategy. - Tests: create from scratch; reflect a matching table; reflect a drifted table; confirm error message lists differences.
Exit: targets can be created and validated.
Phase 5 — Strategy: AppendOnly (≈1 day) ✅¶
The simplest strategy; proves the planner → executor pipeline end-to-end.
-
strategy/append.rs::plan(spec) -> ExecutionPlanemitting anINSERT INTO target (cols...) SELECT cols... FROM (<source query>) srcfor the same-DB path. - Cross-DB path:
COPY ... TO STDOUT BINARYfrom source,COPY ... FROM STDIN BINARYinto a staging temp table, then INSERT...SELECT into target. - Auto-add
_loaded_at,_batch_idcolumns if not declared. - Run history record written.
- Tests: same-DB and cross-DB integration tests against testcontainers.
Exit: pipeline.sync(..., mode="append") works.
Phase 6 — Strategy: TruncateReplace (≈0.5 day) ✅¶
- Same-DB:
BEGIN; TRUNCATE; INSERT...SELECT; COMMIT. - Cross-DB: stage, then
BEGIN; TRUNCATE; INSERT...SELECT FROM stage; COMMIT. - Tests: correct row count, atomicity (kill-mid-load leaves old data).
Exit: mode="truncate" works.
Phase 7 — Strategy: MergeUpsert / SCD1 (≈1–2 days) ✅¶
- Same-DB:
INSERT INTO target (cols) SELECT cols FROM (<source>) src ON CONFLICT (keys) DO UPDATE SET col = EXCLUDED.col, ... WHERE target.* IS DISTINCT FROM EXCLUDED.*(the WHERE makes the affected row count meaningful). - Cross-DB: stage →
ON CONFLICTfrom staging table. - Configurable
update_columns, default = all non-key declared columns. - Returns
rows_inserted,rows_updated,rows_unchanged. - Tests: insert-only, update-only, mixed, idempotency (run twice → second run reports 0/0).
Exit: mode="merge" / mode="scd1" works.
Phase 8 — Strategy: SCD2 (≈3–4 days, biggest single phase) ✅¶
- Hash function (
hash.rs): - Canonical encoding: per-column
coalesce(col::text, '\x00NULL\x00')joined with a\x01separator (chosen to be vanishingly unlikely in real text data). - SHA-256 of UTF-8 bytes; stored as 32-byte
byteainrow_hash. - Postgres-side equivalent:
digest(coalesce(col1::text,'\x00NULL\x00') || '\x01' || ..., 'sha256')usingpgcrypto(auto-CREATE EXTENSIONon first run). - DDL: SCD2 ensures
valid_from TIMESTAMPTZ NOT NULL,valid_to TIMESTAMPTZ,is_current BOOLEAN NOT NULL,row_hash BYTEA NOT NULL, and a partial indexWHERE is_currentkeyed on the natural key. - Same-DB SQL plan (single transaction):
WITH src AS ( SELECT *, digest(...) AS row_hash FROM (<user query>) q ), changed AS ( SELECT src.* FROM src LEFT JOIN target t ON t.<keys> = src.<keys> AND t.is_current WHERE t.row_hash IS DISTINCT FROM src.row_hash ) UPDATE target SET valid_to = now(), is_current = false WHERE is_current AND <keys> IN (SELECT <keys> FROM changed); INSERT INTO target (<cols>, valid_from, valid_to, is_current, row_hash) SELECT <cols>, now(), NULL, true, row_hash FROM changed; - Cross-DB: stage source rows + their hashes into a temp table, then
run the same
changed/UPDATE/INSERTflow against the staging table. - Configurable column names:
effective_from,effective_to,current_flag,hash_column. - Tests:
- First load creates one current version per natural key.
- Re-running with no source changes → 0 changes.
- Updating a
compare_columnsvalue → previous version closed out, new version current. - Updating a non-
compare_columnsvalue → no new version (and the target row is not updated — SCD2 is immutable beyond close-out). - Order independence: hash is stable across column physical order.
- NULL handling: NULL → 'x' is detected; 'x' → NULL is detected.
Exit: mode="scd2" works end-to-end.
Phase 9 — Source abstraction polish (≈1 day) ✅¶
-
Source.postgres_query(conn, query): arbitrary SELECT. -
Source.postgres_table(conn, schema, table, columns=None): sugar that builds the SELECT. - Source-side column projection: only fetch declared
compare_columns + keys + incremental_column + non-key columns. - Auto-detection of same-DB vs cross-DB path (Phase 3 helper).
- Override:
pipeline.sync(..., force_path="staging")for tests. - Tests covering both paths for at least Append + SCD2.
Exit: source resolution is robust and override-able.
Phase 10 — Watermarking + metadata schema (≈1–2 days) ✅¶
- Rust
meta.rs: lazy creation ofematix_flowschema +watermarks,run_history,schema_historytables. -
incremental_columnargument plumbed end-to-end. - Watermark read before run, watermark write after success only.
- Run history populated for every run (including failures).
- Tests:
- Watermark advances on success.
- Watermark does not advance on failure (simulated via post-load error).
- First run with watermark mode treats
last_value = -infinity.
Exit: incremental loads work and are restart-safe.
Phase 11 — Delete handling opt-in (≈1 day) ✅¶
-
handle_deletesargument; mutual-exclusion check vs.incremental_column. -
"soft"semantics for SCD2 (close out missing keys). -
"soft"semantics for SCD1 (setis_deleted=true, auto-add column). -
"hard"semantics for SCD1 (DELETE missing keys). - Tests: insert, then drop a key from source; verify expected outcome.
Exit: delete handling opt-in works.
Phase 12 — Scheduling + flow CLI (≈1 day) ✅¶
- Cron-string parser:
croniter-style; in Rust usecroncrate. -
Pipeline(schedule=...)registration in a module-level registry. -
python -m ematix_flow.clientry point +flowconsole script inpyproject.toml. - Subcommands:
flow list --module my_pipelinesflow run --module my_pipelines <name>flow run-due --module my_pipelines [--now ISO8601]- Tests:
flow run-duewith mocked--nowtriggers expected pipelines.
Note: flow daemon (long-lived scheduler with internal cron loop, per-pipeline
locks, SIGTERM-safe shutdown) is deferred to v0.2. v0.1 users invoke
flow run-due from external cron / k8s CronJob / GitHub Actions schedule.
Exit: flow run-due demonstrably runs from cron / k8s CronJob.
Phase 13 — Test infrastructure (continuous, but polish here, ≈1 day) ✅¶
-
tests/python/conftest.pyprovides apg_urlfixture usingtestcontainers[postgres]. - Mark long integration tests
@pytest.mark.integration; defaultpytestruns unit only, CI runs both. - Rust integration tests in
crates/ematix-flow-core/tests/use the same Docker image viatestcontainersRust crate. - Coverage report (Codecov optional).
Phase 14 — Docs, packaging, release (≈2 days) 🚧¶
- README quickstart (10-minute path: install → declare → SCD2 sync).
-
mkdocssite with: concepts, strategies, sources, scheduling, CLI, API reference (autogen viamkdocstrings). -
examples/directory with one example per strategy + one full cron-driven setup. - GitHub Actions wheel build matrix:
- macOS x86_64, macOS aarch64
- Linux x86_64, Linux aarch64 (manylinux2014 + musllinux)
- Python 3.10, 3.11, 3.12, 3.13
- Trusted publishing to PyPI via
pypa/gh-action-pypi-publish. - Tag
v0.1.0, publish.
Risk register¶
| Risk | Likelihood | Impact | Mitigation |
|---|---|---|---|
pgcrypto not installed in user's DB |
Medium | Medium | Detect on first run, auto-CREATE EXTENSION, fall back to md5() if forbidden. |
| Same-DB / cross-DB detection is fooled by pgbouncer / connection proxies | Low | Medium | Allow explicit force_path override; document the heuristic. |
| Hash collisions silently mask changes | Very low (SHA-256) | High | Document; consider HMAC-keyed hash later. |
| Wide tables (>200 cols) blow up the canonical-encoding string | Low | Medium | Stream the digest in Rust; batch column groups in Postgres. |
| Users expect SQLAlchemy interop | Medium | Low | Document clearly; consider thin SA-compat shim in v0.2. |
| Cron daemon is the wrong abstraction at scale | Medium | Low | It's a v0.1 convenience; v0.2 likely adds k8s-CronJob first-class story. |
Estimate¶
Rough total for one engineer working steady: ~3 weeks of focused work to v0.1 PyPI release. Phase 8 (SCD2) is the largest single chunk; Phases 0–4 are the prerequisites that feel slow but enable everything else.
Critical path: Phase 0 → 1 → 2 → 3 → 4 → 5 → 7 → 8 → 14. Phases 6, 9, 10, 11, 12 can fan out in parallel after Phase 7 lands.
Post-v0.1 phase log (Phases 15–28)¶
The phases below were added after the v0.1 plan was committed. They extend the original surface area without changing it. Each ships with the same TDD discipline as Phases 0–14: failing test → minimal implementation → green tests → integration tests against testcontainers Postgres → commit. See companion design docs for locked-decision logs.
Phase 15 — Event-time SCD2 (≈1d) ✅¶
event_timestamp_column='col' on pipeline.sync makes valid_from
come from a source column instead of now(). Out-of-order arrivals
are rejected with a clear error.
Phase 16 — TTL / expiry (≈0.5d) ✅¶
ttl=timedelta(...) on SCD2 syncs adds a post-step UPDATE closing out
versions whose valid_from is older than now() - ttl. Same-tx
atomicity with the load.
Phase 17 — @ematix.feature_view declarative class (≈1–2d) ✅¶
FeatureView(ManagedTable) base + @ematix.feature_view(schema=,
feature_version=, event_timestamp_column=, ttl=, freshness_sla=,
online=, description=, owner=) decorator. Process-local
_FEATURE_VIEWS_REGISTRY. Pipelines targeting a FV default to
mode='scd2' and inherit class dunders. Lazy
ematix_flow.feature_views metadata catalog.
Phase 18 — Point-in-time + asof-join (≈1–2d) ✅¶
Cls.point_in_time(conn, entity_keys, as_of) and
Cls.historical_features(conn, spine, columns) classmethods on
FeatureView. Single LATERAL JOIN against a VALUES spine. Requires
[df] extra (psycopg2 + DictCursor).
Phase 19 — Online (is_current) snapshot (≈1d) ✅¶
online=True provisions a partial index on the main table, a
<table>__online materialized view, and a unique index for
REFRESH … CONCURRENTLY. Refreshed automatically after each successful
sync. Cls.online_features(conn, entity_keys) queries the MV.
Phase 20 — Training-dataset builder (≈1d) ✅¶
ematix.training_set(conn, spine=, feature_views=[...], columns=,
prefer="auto") joins multiple FVs against a spine and returns a
polars / pandas DataFrame. Single round trip via per-FV LATERAL.
Phase 21 — Connection registry (≈0.5d) ✅¶
from ematix_flow import connect; conn = connect("warehouse")
resolves named connections from env vars (EMATIX_FLOW_DSN_<NAME>)
then ~/.ematix-flow/connections.toml. CLI: flow connections
{list, check, set}. Phase 22–23 extend this with merge-key resolution
helpers.
Phase 22 — UNIQUE-constraint reflection (≈0.5d) ✅¶
PgPool::read_existing_uniques returns declared unique constraints
keyed by name. Used by the drift comparator and the merge-key
resolution path.
Phase 23 — Merge-key resolution (≈0.5d) ✅¶
pipeline.sync(keys=...) resolves in priority order: explicit keys=
→ __merge_keys__ dunder → first natural_key() group → primary
keys. Warn when sources disagree.
Phase 24 — Decorator API + multi-target (≈2–3d) ✅¶
@ematix.table(schema=) reads PEP 593 Annotated[T, pk()] markers.
@ematix.pipeline(target=…, schedule=, mode=) registers a function
as a pipeline. targets=[ematix.target(Cls, mode=…), …] for
multi-target halt-on-first-failure dispatch.
Phase 25 — Preview, dry-run, validate (≈2d) ✅¶
pipeline.preview(name) synthesizes the SQL plan with no DB calls;
dry_run(name) runs the load in a transaction and rolls back;
validate(name) EXPLAINs the synthesized SQL. CLI subcommands +
rich-rendered text output. flow validate adds Q1 mitigation for
the no-decoration-time-checking design.
Phase 26 — Per-column normalization (≈2–3d) ✅¶
ematix_flow.normalize module with marker catalogue compiling to
in-database SQL applied as CTEs. parse_timestamp(formats=…,
on_failure=…), parse_int / parse_numeric / parse_date,
default(value), regex_replace, derive(expression),
whitespace_to_null, email_normalize, etc. Pipeline-level
transforms_pre=[deduplicate_by, filter_where, limit, sample_pct].
Follow-up (Q3.5 γ): parse_timestamp() / parse_date() with no args
fall back to DEFAULT_TIMESTAMP_FORMATS / DEFAULT_DATE_FORMATS
catalogues (first-match-wins).
Phase 27 — Post-load transformations (≈2–3d) ✅¶
Six sub-phases (27a–27f):
- 27a
transforms_post=[sql_string, …]+continue_on_failure_post run_historyschema migration (parent_run_id,step_name,metrics_json).- 27b Callable transforms with
(conn,)or(conn, parent)auto-detected arity. Optional metrics dict return.@ematix.transformdecorator +pipeline.run_transform. - 27c
ematix.transform_ref("name")resolves to a registered transform or pipeline (chained-pipeline pattern). - 27d Merged
flow list+flow transform list/run. - 27e
ematix_flow.dfmodule (Connection.read_df/write_df, polars or pandas auto-detect, COPY CSV transport, all five strategy modes for inferred + ManagedTable paths).[df]extra: psycopg2-binary. - 27f
preview/dry_run/validateaware oftransforms_post.
Phase 28 — Spark interop (≈2d) ✅¶
ematix_flow.spark monkey-patches Connection.read_spark_df /
write_spark_df via Postgres JDBC. Routes write through the strategy
executor. [spark] extra: pyspark. Live E2E tests opt-in via
pytest -m spark (require JVM + JDBC jar).
Pending (post-shipped) housekeeping¶
- Wheel-build matrix in CI (
maturin+ GitHub Actions for macOS x86_64 / aarch64, Linux x86_64 / aarch64). - Trusted publishing to PyPI as
ematix-flow. - mkdocs site build with
mkdocstringsAPI reference. - An
examples/directory with one runnable example per strategy.