Skip to content

Σ.B spike — connector trait shape

Question. Σ.B requires the connector trait to be serializable across Arrow Flight (executors instantiate connectors lazily on remote nodes from URL-only config). Σ.A2 and Σ.D have follow-on needs that, if addressed in a later refactor, mean two breaking changes instead of one. What's the unified trait shape that absorbs all three needs in a single rework?

Status. Concrete proposal + 6 sub-questions; all 6 locked 2026-05-05. Σ.B PR 1 shipped (4 commits a/b/c/d, all 10 backends migrated). PR 2 pivoted from Ballista to datafusion-distributed (2026-05-05) — see the "PR 2 distributed-engine pivot" section near the bottom for rationale. The trait-shape decisions are unaffected by the pivot.


What each sub-phase needs from the trait

Σ.A2 — dialect translator

No new methods on Backend. The translator sits above the Backend dispatch — it rewrites SQL → SQL before the SQL ever reaches Backend::read_arrow_stream / Backend::execute. The translator is a free function (dialect::translate(sql, from)). Trait shape unchanged.

Σ.B — Ballista distributed batch

Three needs, all centered on shipping a backend instance to a remote executor:

  1. 'static bound. Today: pub trait Backend: Send + Sync. To put backend instances inside Arc<dyn Backend> and ship them via the Ballista scheduler → executor channel, they must outlive the call ('static). Most impls already meet this transitively (Arc<Mutex<…>>, Arc<Pool>, owned Strings) — adding the bound is non-breaking for in-tree impls.
  2. Serializable config. Today: backends carry live state (Arc<Mutex<ConsumerSession>>, deadpool_postgres::Pool, aws_sdk_kinesis::Client). These can't ship over the wire. Need a separate config blob that's Serialize + DeserializeOwned and from which a fresh backend instance can be reconstructed on the executor side.
  3. Lazy resource init. Once a backend lands on an executor, it shouldn't open pools / consumer sessions until the first call. Today's KafkaBackend::consumer_session: Arc<Mutex<ConsumerSession>> is already lazy; PgPool opens eagerly in the constructor. Refactor makes laziness uniform across all 11 backends.

Σ.D — distributed streaming

Two follow-on needs that should land in the Σ.B trait shape so we don't break it again:

  1. Per-key partitioning hint. For windowed joins distributed across executors, the framework needs to ask "if you read this query, can you produce results partitioned by key K?" Today's read API is unpartitioned. Σ.D needs read_arrow_stream_partitioned or a partitioning_hint(query) -> Option<KeyRange> method.
  2. Distributed state hooks. Σ.D extends the existing seek_to / offset_snapshot (Phase 39.5a) into per-key state shipped across shuffle boundaries. The trait shape for this lands as part of Σ.D itself, but the surface should already be plumbing-compatible with Σ.B's serializable-config model.

Current trait at a glance

#[async_trait]
pub trait Backend: Send + Sync {
    fn dialect(&self) -> Dialect;
    fn connection_info(&self) -> ConnectionInfo;
    fn dsn(&self) -> Option<String>;
    fn as_postgres(&self) -> Option<&PgPool> { None }

    async fn ping(&self) -> Result<(), BackendError>;
    async fn execute(&self, statement: &str) -> Result<u64, BackendError>;
    async fn read_arrow_stream(&self, query: &str) -> Result<ArrowBatchStream, BackendError>;
    async fn write_arrow_stream(&self, target: &TargetTable, ..., mode: WriteMode) -> Result<u64, BackendError>;

    async fn run_append(&self, ..., source_backend: Option<&dyn Backend>, ...) -> Result<...>;
    async fn run_truncate(&self, ..., source_backend: Option<&dyn Backend>, ...) -> Result<...>;
    async fn run_merge(&self, ..., source_backend: Option<&dyn Backend>, ...) -> Result<...>;
    async fn run_scd2(&self, ..., source_backend: Option<&dyn Backend>, ...) -> Result<...>;

    async fn commit_offsets(&self) -> Result<(), BackendError> { Ok(()) }
    fn supports_seek_to(&self) -> bool { false }
    async fn seek_to(&self, offset_bytes: &[u8]) -> Result<(), BackendError> { ... default error }
    async fn offset_snapshot(&self) -> Result<Option<Vec<u8>>, BackendError> { Ok(None) }
}

11 impls: PostgresBackend (in backend.rs), KafkaBackend, MySqlBackend, SqliteBackend, DuckDbBackend, DeltaBackend, ObjectStoreBackend, PubSubBackend, RabbitMqBackend, KinesisBackend, plus streaming.rs's test stub.


Proposed shape

The split: a single Backend operation interface (today's, plus a 'static bound), a separate BackendConfig serializable struct per backend, and a BackendFactory registry that bridges the two.

// 1. Operation surface. Same as today, plus 'static.
//    Note `as_postgres` becomes deprecated — Σ.B PR 1 removes it.
#[async_trait]
pub trait Backend: Send + Sync + 'static {
    fn dialect(&self) -> Dialect;
    fn connection_info(&self) -> ConnectionInfo;
    fn dsn(&self) -> Option<String>;

    /// NEW: serializable form of this backend's constructor + builder
    /// state. Must NOT contain live connections / pools / file
    /// descriptors. Used by Ballista to ship the backend to executors.
    fn config(&self) -> BackendConfig;

    async fn ping(&self) -> Result<(), BackendError>;
    async fn execute(&self, statement: &str) -> Result<u64, BackendError>;
    async fn read_arrow_stream(&self, query: &str) -> Result<ArrowBatchStream, BackendError>;
    async fn write_arrow_stream(&self, target: &TargetTable, ..., mode: WriteMode) -> Result<u64, BackendError>;

    // run_append / run_truncate / run_merge / run_scd2 unchanged.
    // commit_offsets / supports_seek_to / seek_to / offset_snapshot unchanged.

    /// NEW (Σ.D-ready): can this backend partition reads by key?
    /// `None` means single-stream only. Default `None` means existing
    /// backends opt in by override; Σ.B can ship without any override.
    fn partitioning_hint(&self, _query: &str) -> Option<KeyPartitioning> { None }
}

// 2. Serializable config. Single tagged enum so `serde_json::Value`
//    is the wire format (matches the existing TOML config shape;
//    JSON is a strict superset).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum BackendConfig {
    Postgres(PostgresConfig),
    MySql(MySqlConfig),
    Sqlite(SqliteConfig),
    DuckDb(DuckDbConfig),
    Kafka(KafkaConfig),
    Kinesis(KinesisConfig),
    PubSub(PubSubConfig),
    RabbitMq(RabbitMqConfig),
    Delta(DeltaConfig),
    ObjectStore(ObjectStoreConfig),
}

// One concrete config per backend. Example:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaConfig {
    pub bootstrap_servers: String,
    pub group_id: Option<String>,
    pub auth: AuthConfig,           // already a serde-shaped enum
    pub payload_format: KafkaPayloadFormat,
    pub schema_registry_url: Option<String>,
    pub schema_registry_basic_auth: Option<SrBasicAuthConfig>,
    pub message_key_column: Option<String>,
    pub batch_config: KafkaBatchConfig,
    pub delivery_semantics: KafkaDeliverySemantics,
}

// 3. Reverse direction: config → backend instance. Doesn't fit on
//    the Backend trait (would need `Self: Sized`, breaking dyn dispatch).
//    Free function dispatching on the config tag.
pub fn backend_from_config(cfg: BackendConfig) -> Result<Arc<dyn Backend>, BackendError> {
    match cfg {
        BackendConfig::Postgres(c) => Ok(Arc::new(PostgresBackend::open(c)?)),
        BackendConfig::Kafka(c) => Ok(Arc::new(KafkaBackend::open(c)?)),
        // ... 10 arms
    }
}

// 4. Each backend's constructor takes its config struct directly.
//    No more builder methods on the public surface — config is the
//    canonical input. (Builders stay as private convenience wrappers
//    inside tests, but production code constructs from config.)
impl KafkaBackend {
    pub fn open(cfg: KafkaConfig) -> Result<Self, BackendError> {
        Ok(Self {
            cfg,                                // own the config
            consumer_session: Arc::new(Mutex::new(ConsumerSession::not_yet_opened())),
            producer_session: Arc::new(Mutex::new(ProducerSession::not_yet_opened())),
        })
    }
}

The config blob is the canonical wire form. PG-side: PostgresConfig { dsn: String, ssl_mode: SslMode, ... }. Kafka-side: KafkaConfig above. Anything that today's with_* builder methods set lives in the config struct.

Why this shape

  • One breaking change. All 11 backends migrate to constructor- takes-config + fn config() returns the same struct. After Σ.B PR 1, no public builder methods (with_*) exist on backend types — the config struct is the API. Π-block users construct Backend from a config they got from TOML or built in-process; Ballista users serialize the config across the wire.
  • Σ.A2 is unaffected. Translator runs above the trait.
  • Σ.D's per-key partitioning is plumbed in. Default returns None; Σ.D adds overrides for Kafka (partition-keyed reads) + Postgres (range-partitioned reads) without breaking the trait again.
  • Lazy init falls out naturally. The constructor stores the config; pools/sessions construct on first method call. Every backend uses the same pattern.
  • Credentials story is unchanged. Today config carries DSNs + passwords. Tomorrow same — over Arrow Flight (TLS) instead of in-process. Threat model: same as Spark shipping JDBC URLs to Spark executors, which is industry-standard.

Migration cost — per backend

Each of the 11 backends:

  1. Define a <Backend>Config struct that's Serialize + Deserialize. Field-by-field copy of today's struct; replace live state (Arc<Mutex<…>>, pools) with not_yet_opened() placeholders.
  2. Refactor constructor: Backend::open(cfg: Config) -> Result<Self>. Builder methods (with_sasl_plain, etc.) reduce to mutators on the Config struct before ::open.
  3. Implement Backend::config(&self) -> BackendConfig. Just clones the stored Config.
  4. Add a match arm to backend_from_config.
  5. Update existing tests that call builders directly — they construct a Config first.

Estimated 2 days/backend × 11 = ~3 weeks, but mostly mechanical. PR strategy below stages it across 4 commits to keep diffs reviewable.

Σ.B PR 1 commit plan

a. Trait + dispatch surface. Add BackendConfig enum, fn config() trait method (default unimplemented!() for now), backend_from_config stub. No backend impls touched yet — this commit only changes backend.rs + adds the tagged enum.

b. Migrate database backends. PostgresBackend, MySqlBackend, SqliteBackend, DuckDbBackend. These are the simplest (no live sessions; just pools). Convert each to Config shape; populate fn config(); add to backend_from_config.

c. Migrate object-store + delta. ObjectStoreBackend, DeltaBackend. Configs include S3 credentials.

d. Migrate streaming backends. KafkaBackend, KinesisBackend, PubSubBackend, RabbitMqBackend. These are the heaviest because they carry live session state — but the laziness pattern is already mostly in place. KafkaBackend is the longest commit (largest backend file in the workspace).

After commit (d), Backend::config() is implemented for every backend and backend_from_config(cfg) round-trips for every kind.

Acceptance test for the refactor

A single integration test that, for every backend kind, exercises:

let original = backend_kind_specific_constructor();
let cfg = original.config();
let json = serde_json::to_string(&cfg)?;
let cfg_round_trip: BackendConfig = serde_json::from_str(&json)?;
let recovered = backend_from_config(cfg_round_trip)?;
assert_eq!(original.dialect(), recovered.dialect());
assert_eq!(original.connection_info(), recovered.connection_info());
recovered.ping().await?;  // verify live ops still work after round-trip

This is the failing test driving the refactor (TDD). One test function per backend kind; CI matrix covers each.


Locked decisions (2026-05-05)

All 6 sub-questions resolved per the recommended defaults below. Σ.B PR 1 implementer takes these as the contract; deviations require a new spike commit referencing this section.

  1. Wire format: JSON. Locked.
  2. Rationale: human-readable, debuggable in CI logs, matches the existing TOML config shape (JSON is a strict superset). Config blobs are a few KB; debug-ability beats wire-size.
  3. Postcard considered + rejected — ~2× smaller but binary-only; not worth losing inspectability over a few KB per backend.

  4. Credentials in config blob: inline today. Locked.

  5. PostgresConfig { dsn: "postgres://user:pass@host" }. Same handling as today; no in-band redaction.
  6. Lock down via TLS at the Arrow Flight transport layer. Document the threat model in docs/SECURITY.md as part of Σ.B PR 1.
  7. Env-var indirection (${PASSWORD} resolved on the executor) ships as a follow-up after a user requests it. Non-breaking addition: a string starting with $ triggers the resolver.

  8. as_postgres() escape hatch: remove in PR 1. Locked.

  9. The cross-DB Arrow path (Phase 30b/c) is production-tested. The PG-↔-PG COPY BINARY fast path is a micro-optimization that complicates serialization (can't ship a PgPool reference over the wire).
  10. Removing forces the universal Arrow path — which is what Ballista uses anyway. Single code path is easier to reason about + benchmark.
  11. If a regression surfaces in TPC-H Σ.A1 numbers, revisit by adding a same-process fast path inside read_arrow_stream (peek at runtime type, no trait surface change).

  12. Out-of-tree backend registry: closed (match-on-tag) for now. Locked.

  13. Σ.B's backend_from_config is a closed match. Users wanting custom backends fork — same as parquet-rs and arrow-rs.
  14. Open registry via inventory or linkme is non-breaking to add later: replace the closed match with a registry lookup; in-tree backends register themselves; third parties get the same hook.
  15. Revisit timing: when a real user asks. Don't ship abstractions ahead of demand.

  16. partitioning_hint shape: default-None in Σ.B PR 1. Locked.

  17. Method exists on the trait; default returns None. No backend overrides in Σ.B.
  18. Semantics + override shape locked in Σ.D once we know whether range partitioning, hash partitioning, or per-source-partition is the dominant pattern.

  19. Strategy methods over Ballista: scheduler reconstructs both backends; ships live Arc<dyn Backend> references. Locked.

  20. run_append / run_merge / run_scd2 keep their current signatures (Option<&dyn Backend> for the source).
  21. Scheduler-side: backend_from_config(target_cfg) + backend_from_config(source_cfg) happen on the scheduler before dispatch.
  22. Executor-side strategies don't see the config blobs — they get the same trait references they have today.
  23. Net effect: zero change to the strategy interface; all the serialization complexity is contained in the scheduler.

Implications for Σ.B PR 1

With these locked, the migration plan in the previous section ("Σ.B PR 1 commit plan") is final:

a. Trait scaffold (BackendConfig enum, fn config() method, backend_from_config stub, as_postgres removed). b. Migrate DB backends (Postgres, MySQL, SQLite, DuckDB). c. Migrate object-store + delta backends. d. Migrate streaming backends (Kafka, Kinesis, Pub/Sub, RabbitMQ).

Plus the round-trip integration test described in "Acceptance test for the refactor" above, run against every backend kind.


PR 2 distributed-engine pivot (2026-05-05): datafusion-distributed over Ballista

The original spike named Apache Ballista as the distributed engine behind BallistaBackend. By the time PR 2 opened, two facts surfaced that flipped the choice to datafusion-distributed (the datafusion-contrib library).

Trigger 1 — DataFusion version pin. Ballista 52.0.0 (latest at PR-2 time) pins datafusion = "^52". The workspace is on datafusion = "53.1", driven by Σ.A1 + Σ.A2 + the orc-rust / deltalake / parquet 58 ABI track. Adopting Ballista would force a workspace-wide downgrade that unwinds all of Σ.A. The spike's own "pin tight to DataFusion's transitive version" guideline rules out that path.

Trigger 2 — better architectural fit. Ballista's shape is scheduler-process + executor-process, similar to YARN. The Σ block's pitch ("scale SQL like PySpark, footprint like ematix-flow") is better served by the datafusion-distributed model, where any process linking the library can act as either a coordinator or a worker. No new binaries to ship; no separate cluster service to operate. From upstream's README:

No coordinator-worker architecture. To keep infrastructure simple, any node can act as a coordinator or a worker.

What this changes:

Original (Ballista) After pivot (datafusion-distributed)
crates/ematix-flow-ballista with flow-scheduler + flow-executor binaries crates/ematix-flow-distributed library only — no new binaries
Cluster image budget ≤150 MB total Each ematix-flow pod is self-contained; no separate cluster image
examples/ballista-cluster/ docker-compose with scheduler + 3 executors + MinIO examples/distributed-cluster/ showing N peer pods + cross-pod Arrow Flight
ballista 0.12 workspace dep (per the original spike) datafusion-distributed 1.0 workspace dep (DataFusion-53-aligned)
[transform] engine = "ballista" config [transform] engine = "distributed" config

What this does NOT change:

  • All 6 locked trait-shape decisions stay. They're upstream of the distributed-engine choice (BackendConfig + JSON wire format + closed registry + partitioning_hint shape + strategy-methods- over-distributed).
  • Σ.B PR 1 (the connector-trait refactor) is unaffected; it shipped before the engine choice surfaced.
  • Σ.C's TPC-H head-to-head benchmark goal (vs. PySpark) is unaffected; datafusion-distributed already has TPC-H SF=1 + SF=10 benchmarks published, so we have a comparison shape.
  • Σ.D's distributed-streaming work is unaffected — datafusion- distributed is batch-only, same as Ballista.

Open question still parked: if a user surfaces a real need for the Ballista-style scheduler service (e.g., for a multi-tenant shared cluster), we can add BallistaBackend as a peer of DistributedBackend. Both can coexist — different deployment models for different workloads. Not on the roadmap until a user asks.


Original sub-question prose (preserved for reference)

The recommendations below were the spike's original analysis; the locked decisions above are the load-bearing record. Kept here so the reasoning chain is auditable.

  1. Wire format: JSON or postcard? JSON is human-readable, debuggable, matches today's TOML config shape, and is universal across the Arrow Flight metadata channel. Postcard is more compact (~2× smaller) but binary-only. Recommend JSON — config blobs are tiny (a few KB), debug-ability matters more than bytes-on-wire.

  2. Where does the credential live in the config blob? Two options:

  3. (a) Inline (today's pattern). PostgresConfig { dsn: "postgres://user:pass@host" }. Simple; the executor process needs to handle the password the same way the driver does today.
  4. (b) Indirect via env-var ref. PostgresConfig { dsn: "postgres://user:${PASSWORD}@host" }, resolved on the executor with its own environment. Aligns with Π.5 deprecation of inline credentials.
  5. Recommend (a) for Σ.B PR 1. Keep it simple; lock down via TLS at the Arrow Flight transport layer. Add (b) as a Σ.B follow- up once a user requests it. Document the threat model in docs/SECURITY.md.

  6. as_postgres() escape hatch — keep, deprecate, or remove? Today: fn as_postgres(&self) -> Option<&PgPool> lets the PG↔PG COPY BINARY fast path peek under the trait. Recommend remove in PR 1. The cross-DB Arrow path is already production-tested (Phase 30b/c); the COPY BINARY fast path is a micro-optimization. Removing it eliminates a serialization snag (you can't ship a PgPool reference over the wire) and forces the universal path, which is what Ballista will use anyway.

  7. Out-of-tree custom backends — registry extension API? Today no user can add a backend without a fork. After Σ.B, backend_from_config is closed (match-on-tag). Two options:

  8. (a) Closed registry, document forking. Simplest; users who need custom backends fork. Aligns with how parquet-rs and arrow handle out-of-tree extensions.
  9. (b) Open registry via inventory or linkme crate. Compile-time backend registration; BackendConfig becomes serde_json::Value + a registered factory function.
  10. Recommend (a) for Σ.B; revisit in Σ.D. Ship value first; adding the open registry later is non-breaking (the closed match becomes a lookup against a default registry, third parties register additions).

  11. partitioning_hint shape — bake now or defer? Σ.D needs per-key partitioning info, but the exact shape (range partitioning, hash partitioning, partition-by-Kafka-partition?) isn't locked yet. Recommend default-None in Σ.B PR 1, override schema designed in Σ.D. This adds a method but doesn't commit to its semantics.

  12. run_append / run_merge / run_scd2 source_backend parameter — ship-of-Theseus over Ballista? These methods take Option<&dyn Backend> for the source. If a strategy is dispatched to a Ballista executor, the source backend is also a dyn Backend on that executor. Ergonomic question: do strategy methods become "backend pair" methods (run_append(target_cfg, source_cfg, ...)) so the executor reconstructs both sides from config blobs? Or does the scheduler reconstruct on its side and ship live Arc<dyn Backend>s? Recommend the latter — the scheduler constructs both backends from config, then dispatches the strategy over Ballista with the live references. Keeps the strategy interface unchanged.


What this spike does NOT decide

  • Wire-protocol details for Arrow Flight beyond the config blob. Σ.B PR 2 (the ematix-flow-ballista crate) handles RPC plumbing.
  • Ballista version pin. Σ.B PR 2.
  • Streaming distribution. That's Σ.D.

Recommendation

All 6 sub-questions are locked (see "Locked decisions" above). Σ.B PR 1 is fully scoped: the 4-commit plan in "Σ.B PR 1 commit plan" above is the implementation contract. Estimated effort: 3 weeks for one engineer, mostly mechanical migration of 11 backends to the Config shape.

After PR 1 lands, Σ.B PR 2 (ematix-flow-ballista crate) can start in parallel with Σ.A1 / Σ.A2 work since it doesn't touch backend internals further.

Σ.B PR 1 is queued behind v0.1.0 publishing. As soon as the wheels are on PyPI, this is the first piece of distributed-compute work to open.