Show
Consider a Postgres DB that is initialized with
create schema people;
create table people.person(
id int NOT NULL,
name varchar(80),
PRIMARY KEY (ID)
);
insert into people.person values (1, 'Casper' );
insert into people.person values (2, 'Kasper' );
and initialize a Postgres Connector with `snapshot.mode` set to `initial_only`:
POST $connect_host/connectors
{
"name" : "postgres-source" ,
"config" : {
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector" ,
"tasks.max" : "1" ,
"database.hostname" : "postgres" ,
"database.port" : "5432" ,
"database.user" : "user" ,
"database.password" : "not_very_strong" ,
"database.dbname" : "user" ,
"database.server.name" : "user" ,
"schema.whitelist" : "people" ,
"table.whitelist" : "people.person" ,
"snapshot.mode" : "initial_only"
}
}
Then
Pause the connector:
PUT $connect_host/connectors/postgres-source/pause
Update the configuration (e.g. with some new select statement override):
PUT $connect_host/connectors/postgres-source/config
{
"connector.class" : "io.debezium.connector.postgresql.PostgresConnector" ,
"database.hostname" : "postgres" ,
"database.port" : "5432" ,
"database.user" : "user" ,
"database.password" : "not_very_strong" ,
"database.dbname" : "user" ,
"database.server.name" : "user" ,
"schema.whitelist" : "people" ,
"table.whitelist" : "people.person" ,
"snapshot.mode" : "initial_only" ,
"snapshot.select.statement.overrides" : "people.person" ,
"snapshot.select.statement.overrides.people.person" : "SELECT * FROM people.person WHERE id = 1"
}
Resume the connector:
PUT $connect_host/connectors/postgres-source/resume
Then, the connector will start running in ongoing replication mode (logging snippet taken from local setup executing these steps):
kafka-connect-debezium-postgres_1 | INFO Starting PostgresConnectorTask with configuration: (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO connector.class = io.debezium.connector.postgresql.PostgresConnector (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO database.user = user (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO database.dbname = user (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO snapshot.select.statement.overrides = people.person (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO database.server.name = user (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO database.port = 5432 (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO schema.whitelist = people (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO table.whitelist = people.person (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO snapshot.select.statement.overrides.people.person = SELECT * FROM people.person WHERE id = 1 (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO task.class = io.debezium.connector.postgresql.PostgresConnectorTask (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO database.hostname = postgres (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO database.password = ******** (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO name = resend-postgres-source (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO snapshot.mode = initial_only (io.debezium.connector.common.BaseSourceTask)
kafka-connect-debezium-postgres_1 | INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
kafka-connect-debezium-postgres_1 | INFO user 'user' connected to database 'user' on PostgreSQL 9.6.10 on x86_64-pc-linux-gnu (Debian 9.6.10-1.pgdg90+1), compiled by gcc (Debian 6.3.0-18+deb9u1) 6.3.0 20170516, 64-bit with roles:
kafka-connect-debezium-postgres_1 | role 'user' [superuser: true , replication: true , inherit: true , create role: true , create db: true , can log in: true ]
kafka-connect-debezium-postgres_1 | role 'pg_signal_backend' [superuser: false , replication: false , inherit: true , create role: false , create db: false , can log in: false ] (io.debezium.connector.postgresql.PostgresConnectorTask)
kafka-connect-debezium-postgres_1 | INFO Found previous offset source_info[server= 'user' db= 'user' , lsn=0/169F0B8, txId=556, useconds=1538400288931000, snapshot= false ](io.debezium.connector.postgresql.PostgresConnectorTask)
kafka-connect-debezium-postgres_1 | INFO Previous snapshot has completed successfully, streaming logical changes from last known position (io.debezium.connector.postgresql.PostgresConnectorTask)
kafka-connect-debezium-postgres_1 | INFO Requested thread factory for connector PostgresConnector, id = user named = records-stream-producer (io.debezium.util.Threads)
postgres_1 | LOG: logical decoding found consistent point at 0/169F200
postgres_1 | DETAIL: There are no running transactions.
postgres_1 | LOG: exported logical decoding snapshot: "0000022D-1" with 0 transaction IDs
postgres_1 | LOG: starting logical decoding for slot "debezium"
postgres_1 | DETAIL: streaming transactions committing after 0/169F238, reading WAL from 0/169F200
postgres_1 | LOG: logical decoding found consistent point at 0/169F200
postgres_1 | DETAIL: There are no running transactions.
kafka-connect-debezium-postgres_1 | INFO REPLICA IDENTITY for 'people.person' is 'DEFAULT' ; UPDATE and DELETE events will contain previous values only for PK columns (io.debezium.connector.postgresql.PostgresSchema)
kafka-connect-debezium-postgres_1 | INFO Creating thread debezium-postgresconnector-user-records-stream-producer (io.debezium.util.Threads)
kafka-connect-debezium-postgres_1 | INFO WorkerSourceTask{id=resend-postgres-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)