Skip to content

Python Pipeline Hardening Plan

This plan tracks the Python fixes chosen instead of a Rust migration. The gates below are intentionally scoped so each one can be implemented, peer reviewed, and verified before moving on.

Gate 1 - Snowpipe Commit Safety And Partition-Safe Routing

Status: Accepted

Scope: - Added Snowpipe Streaming SDK import compatibility for current and older ChannelStatus layouts. - Prefer append_rows(...) with start_offset_token and end_offset_token, with append_row(...) fallback. - Require Event Hub sequence_number for source-stable Snowflake offset tokens. - Wait for Snowflake wait_for_commit(...) before reporting ingest success. - Force channel-status validation and fail closed on row errors or fatal channel status. - Group orchestrator batches by Event Hub partition. - Route each Event Hub partition to a stable Snowflake channel suffix. - Fail the batch if any partition ingest fails.

Verification: - uv run pytest -q tests/unit/test_snowflake_client.py tests/unit/test_streaming.py tests/unit/test_orchestrator.py - Result: passed during gate review.

Gate 2 - Event Hub Fail-Closed Batch Contract

Status: Accepted

Scope: - _process_batch(...) returns a boolean success/failure contract. - Sync processors run via asyncio.to_thread(...). - Async processor functions and async callable instances are awaited. - Batch mutation is serialized with batch and processing locks. - Ready and timed-out batches are detached before processing. - Detached batches are tracked in an explicit FIFO queue. - Failed-batch restore rebuilds the current batch from detached backlog first, then newer partial batch messages. - Processor failure, checkpoint retry exhaustion, or missing partition_context fails the batch. - Failed batches do not checkpoint and do not advance success stats. - Batch failure shuts down the receive loop/client. - Snowflake committed replay still validates channel status before reporting success.

Verification: - uv run pytest -q tests/unit/test_snowflake_client.py tests/unit/test_eventhub.py tests/unit/test_orchestrator.py tests/unit/test_streaming.py - Result: 168 passed, with existing warnings under review. - uv run ruff check --select F811 ... - Result: passed.

Gate 3 - Durable Checkpoint Ownership And Metadata

Status: Accepted

Scope: - Replaced in-memory partition ownership with durable backend delegation. - Added Snowflake and Postgres ownership list/claim helpers. - Snowflake ownership requires a Hybrid Table with an enforced primary key for safe compare-and-set claims. - If an existing Snowflake ownership table is a standard table, setup fails closed with migration guidance. - A CONTROL_OWNERSHIP_MODE=local_single_consumer_smoke diagnostic path keeps ownership in memory while persisting Snowflake checkpoints to a standard table; it is only for single-consumer smoke tests when Hybrid Tables are unavailable. - Postgres ownership uses primary-key-backed upsert/update semantics. - Checkpoint backend calls are offloaded with asyncio.to_thread(...) behind manager-level I/O locks. - Checkpoint loads fail closed instead of silently starting from an unsafe position. - Checkpoint save failure propagates through the SDK checkpoint path. - Event Hub offset and sequence number are preserved separately. - Startup logging handles both legacy integer checkpoints and metadata-shaped checkpoints. - Snowflake ownership helper paths validate warehouse identifiers before issuing USE WAREHOUSE.

Verification: - uv run pytest -q tests/unit/test_snowflake_client.py tests/unit/test_eventhub.py tests/unit/test_orchestrator.py tests/unit/test_streaming.py tests/unit/test_snowflake_utils.py - Result: 210 passed, with existing warnings under review. - Targeted ruff F401 / F811 checks passed for changed source files.

Gate 4 - Azure Identity And SDK Options

Status: Accepted

Scope: - Aligned code and docs around production identity behavior. - Prefer DefaultAzureCredential for production-capable authentication. - Keep AzureCliCredential as an explicit local-development opt-in. - Added user-assigned managed identity client ID support. - Close credentials on token validation failure and shutdown. - Pass configured Event Hubs retry and load-balancing options into EventHubConsumerClient. - Pass configured prefetch, max-wait, last-enqueued tracking, and error callback options into receive(...). - Preserve fail-closed behavior when the SDK calls on_error, even if the SDK swallows callback exceptions. - Fail closed on receive-time credential errors instead of leaving the consumer running. - Validate partition ownership expiration against the load-balancing interval. - Allow Azure's documented max_wait_time=0 receive mode. - Reject unsafe fresh-start starting_position=0; use -1 for beginning or @latest for new events. - Updated README and .env.example for identity mode, SDK options, and connection-string override behavior.

Verification: - uv run pytest -q tests/unit - Result: 392 passed, with 2 existing warnings under review. - uv run ruff check - Result: passed. - uv run python -m py_compile src/utils/azure_identity.py src/utils/config_models/eventhub.py src/utils/config.py src/consumers/eventhub.py src/main.py - Result: passed.

Open Gates

Gate 5 - Verification And Handoff: - Run a live two-consumer Snowflake/Event Hub ownership and load-balancing smoke test if credentials and test infrastructure are available. - Update operator docs for identity, ownership, checkpoint behavior, and remaining warnings. - Verify that code comments explain the chosen safety/SDK decisions and include vendor documentation links where those decisions depend on external SDK behavior. - Document that Snowflake ownership uses a Hybrid Table for enforced primary-key claims; operators without Hybrid Table support/privileges should use Postgres control tables or migrate/drop an existing standard ownership table before enabling Snowflake ownership.

Latest live-smoke status: - Full mocked test suite passes: 415 passed, with 2 existing warnings. - Live Event Hub sender succeeds against evsnowqa20260428230344.servicebus.windows.net/topic1 when using a namespace SAS connection string retrieved from Azure CLI. - Live pipeline with Snowflake control-table ownership fails closed before receiving messages because Snowflake Hybrid Tables are not available on the current trial account. - Live pipeline with Docker-backed Postgres control tables succeeds end to end: the sender wrote 5 marker messages, Event Hub receive processed them, Snowpipe Streaming flushed partition channels, and Snowflake query found 5 marker rows. - Live pipeline with Docker-backed Postgres control tables also passes a two-consumer load-balancing smoke: 80 unique marker rows reached Snowflake across partitions 0,1. - Live Snowflake standard-table checkpoint smoke passes in single-consumer mode: INGESTION.PUBLIC.INGESTION_STATUS_SMOKE_20260524180843 was seeded from current Event Hub offsets, phase A ingested 20 rows for marker snowflake-control-smoke-a-20260524180843, phase B restarted from the Snowflake checkpoints and ingested 20 rows for marker snowflake-control-smoke-b-20260524180843, and checkpoint rows advanced for both partitions. This validates Snowflake-backed checkpoints/resume only; partition ownership remains local and is not a durable multi-consumer ownership proof. - The configured Azure Postgres fallback is still not currently usable from this machine: the configured host is stale, and the discovered Azure Postgres hosts reject the configured credentials.