Skip to content

CDC Sources

Skippr reads native change logs from five source systems. Each source captures inserts, updates, and deletes and emits them as CDC events with mutation kind and order token metadata.

PostgreSQL

PostgreSQL CDC uses WAL logical replication with the pgoutput output plugin. Skippr creates a replication slot and publication, then streams row-level changes in real time.

Prerequisites

  1. Set wal_level = logical in postgresql.conf (requires restart)
  2. The replication user must have the REPLICATION attribute or be a superuser
  3. max_replication_slots must be at least 1 (default is usually 10)

Configuration

yaml
source:
  kind: postgres
  host: localhost
  port: 5432
  user: replicator
  password: ${POSTGRES_PASSWORD}
  database: mydb
  cdc_enabled: true
FieldDefaultDescription
cdc_enabledfalseEnable CDC via logical replication
replication_slot_nameskippr_slotName of the replication slot
publication_nameskippr_pubName of the publication

What gets captured

  • INSERT rows with mutation kind insert
  • UPDATE rows with mutation kind update (full row after image)
  • DELETE rows with mutation kind delete

Resume behavior

Skippr stores the committed LSN (Log Sequence Number) after each WAL segment is flushed to the destination. On restart, replication resumes from the stored LSN -- no data is re-read and no events are duplicated.

The replication slot is reused across restarts (not recreated), so PostgreSQL retains WAL segments only until Skippr has confirmed them.


MySQL

MySQL CDC uses binlog replication. Skippr connects as a replication client, reads row-level events from the binary log, and emits them as CDC mutations.

Prerequisites

  1. Set binlog_format = ROW in my.cnf
  2. Set binlog_row_image = FULL (ensures complete before/after images)
  3. The replication user must have REPLICATION SLAVE and REPLICATION CLIENT privileges

Configuration

yaml
source:
  kind: mysql
  connection_string: mysql://replicator:${MYSQL_PASSWORD}@host:3306/mydb
  cdc_enabled: true
FieldDefaultDescription
cdc_enabledfalseEnable CDC via binlog replication
server_idauto-generatedMySQL server ID for the replication client

What gets captured

  • WRITE_ROWS events (inserts)
  • UPDATE_ROWS events (updates with full row image)
  • DELETE_ROWS events (deletes)

Resume behavior

Skippr stores the binlog filename and position after each committed segment. On restart, the binlog stream resumes from the stored position. The initial snapshot is skipped when a stored position exists.


MongoDB

MongoDB CDC uses change streams, which are backed by the oplog. Skippr opens a change stream on the target database and receives real-time notifications for document mutations.

Prerequisites

  1. MongoDB must be running as a replica set or sharded cluster (change streams require an oplog)
  2. The connection user must have read access on the target database

Configuration

yaml
source:
  kind: mongodb
  connection_string: mongodb://user:${MONGO_PASSWORD}@host:27017/mydb
  cdc_enabled: true
FieldDefaultDescription
cdc_enabledfalseEnable CDC via change streams

What gets captured

  • insert operations
  • update operations (full document after image via fullDocument: updateLookup)
  • delete operations

Resume behavior

Skippr stores the MongoDB resume token after each committed segment. On restart, the change stream resumes from the stored token using resume_after. No events are re-processed.


DynamoDB

DynamoDB CDC uses DynamoDB Streams to capture item-level changes. Skippr reads shard iterators and processes records with NEW_AND_OLD_IMAGES to get full before/after item state.

Prerequisites

  1. Enable DynamoDB Streams on the table with StreamViewType = NEW_AND_OLD_IMAGES
  2. The IAM role must have dynamodb:DescribeStream, dynamodb:GetShardIterator, and dynamodb:GetRecords permissions

Configuration

yaml
source:
  kind: dynamodb
  table_name: my_table
  region: us-east-1
  cdc_enabled: true
FieldDefaultDescription
cdc_enabledfalseEnable CDC via DynamoDB Streams

What gets captured

  • INSERT events (new items)
  • MODIFY events (updated items, full new image)
  • REMOVE events (deleted items)

Resume behavior

Skippr stores the sequence number of the last processed record per shard. On restart, each shard iterator starts from AT_SEQUENCE_NUMBER using the stored value. New shards start from TRIM_HORIZON.


Kafka

Kafka CDC consumes Debezium-formatted messages from Kafka topics. Skippr parses the Debezium envelope to extract mutation kind, key fields, and payload.

Prerequisites

  1. A Debezium connector must be running and publishing change events to the Kafka topic
  2. Messages must use the standard Debezium envelope format with op, before, and after fields

Configuration

yaml
source:
  kind: kafka
  brokers: "localhost:9092"
  topic: dbserver1.public.customers
  cdc_enabled: true
FieldDefaultDescription
cdc_enabledfalseEnable CDC via Debezium envelope parsing
debezium_formattrue (when cdc_enabled)Parse messages as Debezium envelopes
group_idskippr-{project}Kafka consumer group ID

What gets captured

  • op: c (create / insert)
  • op: u (update)
  • op: d (delete)

Resume behavior

Skippr uses a stable group_id derived from the project name. Kafka's consumer group offset tracking provides durable resume -- on restart, consumption resumes from the last committed offset.