Skip to content

Plan: Optional Postgres Checkpoint Table

Goals

  • Allow the checkpoint control table (INGESTION_STATUS) to live in Snowflake or Postgres.
  • Keep data ingestion targets in Snowflake unchanged.
  • Default behavior remains Snowflake for backwards compatibility.

Proposed Configuration

  • CONTROL_TABLE_BACKEND={snowflake|postgres} (default: snowflake).
  • Keep TARGET_DB, TARGET_SCHEMA, TARGET_TABLE as the control table location for the chosen backend.
  • Add Postgres connection options, e.g.:
  • CONTROL_PG_HOST
  • CONTROL_PG_PORT
  • CONTROL_PG_USER
  • CONTROL_PG_PASSWORD
  • CONTROL_PG_SSLMODE
  • CONTROL_PG_AUTH_MODE={password|azure_token} (explicit mode selection).
  • Use env vars only for Postgres connectivity (no DSN support).
  • Azure Postgres (passwordless) option:
  • Support token-based auth via DefaultAzureCredential and psycopg.
  • Allow env inputs for the URI builder using the same CONTROL_PG_* variables.
  • Reference: https://learn.microsoft.com/en-us/azure/postgresql/connectivity/connect-python?tabs=bash%2Cpasswordless
  • Connection pooling guidance (Azure official recommendation):
  • Prefer PgBouncer for pooling (built-in on Azure Database for PostgreSQL flexible server, or external).
  • No in-app pooling planned; document how to point CONTROL_PG_HOST/CONTROL_PG_PORT to PgBouncer.
  • Reference: https://learn.microsoft.com/en-us/azure/postgresql/connectivity/concepts-connection-pooling-best-practices

Work Breakdown

  1. Config modeling
  2. Add PostgresConnectionConfig in src/utils/config_models/.
  3. Add control_table_backend and control_postgres fields to EvSnowConfig.
  4. Validate required Postgres settings when control_table_backend=postgres.
  5. Normalize TARGET_SCHEMA for Postgres (e.g., force lower-case unless quoted).

  6. Postgres control table utilities

  7. Add src/utils/postgres.py with:
    • connection helper(s)
    • token-based auth helper for Azure Postgres (DefaultAzureCredential -> token as password)
    • create_control_table(...)
    • insert_partition_checkpoint(...) using INSERT ... ON CONFLICT
    • get_partition_checkpoints(...) with latest-per-partition query
  8. Mirror the Snowflake schema and primary key: (eventhub_namespace, eventhub, target_db, target_schema, target_table, partition_id).
  9. Use psycopg (per Azure recommendation) for Postgres connectivity.

  10. Checkpoint manager abstraction

  11. Introduce a small interface (protocol/base class) for checkpoint managers.
  12. Generalize the Azure CheckpointStore wrapper to accept any manager.
  13. Implement PostgresCheckpointManager alongside existing SnowflakeCheckpointManager.

  14. EventHub consumer integration

  15. Update EventHubAsyncConsumer to accept control_table_backend and optional Postgres config.
  16. Instantiate the correct manager/store based on the backend.
  17. Update logs to reflect the chosen backend.

  18. CLI validation and setup

  19. Update main.validate_config to create/verify the control table in the selected backend.
  20. Add a Postgres setup helper (SQL or CLI) similar to Snowflake setup docs.

  21. Documentation and examples

  22. Update README.md and SNOWFLAKE_QUICKSTART.md with the new option.
  23. Add a short Postgres setup snippet and note on checkpoint migration when switching backends.

  24. Tests

  25. Unit tests for Postgres utilities and checkpoint manager.
  26. Update EventHub/main tests to cover control_table_backend=postgres.
  27. Config validation tests for missing/invalid Postgres fields.
  28. Validate with ruff and ty after code changes; run full test suite; maintain coverage.

Open Questions

  • No open questions (auth mode is explicit via CONTROL_PG_AUTH_MODE).