Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6465

last_in_data_collection instead of last in snapshot field

XMLWordPrintable

    • False
    • None
    • False

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-postgres version 2.1.2.Final

      What is the connector configuration?

      {
        "name": "postgres-source-connector",
        "config": {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "database.hostname": "postgresql",
          "database.port": "5432",
          "database.user": "postgres",
          "database.password": "postgres",
          "database.dbname": "postgres",
          "topic.prefix": "test_postgres",
          "schema.name.adjustment.mode": "avro",
          "table.include.list": "public.test_table,public.test_empty_table",
          "snapshot.mode": "always",
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter"
        }
      }
      

      What is the captured database version and mode of depoyment?

      PostgreSQL v.15.1, Kubernetes

      What behaviour do you expect?

      Last record in snapshot contains:

      "snapshot": "last",
      

      What behaviour do you see?

      Last record in snapshot contains:

      "snapshot": "last_in_data_collection",
      

      Do you see the same behaviour using the latest relesead Debezium version?

      No

      How to reproduce the issue using our tutorial deployment?

      1. Create tables:

      CREATE TABLE test_table (
        id BIGSERIAL NOT NULL,
        value INTEGER,
        PRIMARY KEY (id, value)
      );
      CREATE TABLE test_empty_table (
        id BIGSERIAL NOT NULL,
        value INTEGER,
        PRIMARY KEY (id, value)
      );
      

      2. Insert data:

      INSERT INTO test_table VALUES (1, 1);
      INSERT INTO test_table VALUES (2, 1);
      

      3. Create a new connector:

      curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d'
      {
        "name": "postgres-source-connector",
        "config": {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "database.hostname": "postgresql",
          "database.port": "5432",
          "database.user": "postgres",
          "database.password": "postgres",
          "database.dbname": "postgres",
          "topic.prefix": "test_postgres",
          "schema.name.adjustment.mode": "avro",
          "table.include.list": "public.test_table,public.test_empty_table",
          "snapshot.mode": "always",
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter"
        }
      }'
      

      4. Check last record in topic test_postgres.public.test_table:
      ER:

      {
        "payload": {
          "before": null,
          "after": {
            "id": 2,
            "value": 1
          },
          "source": {
            "version": "2.1.2.Final",
            "connector": "postgresql",
            "name": "test_postgres",
            "ts_ms": 1684139539622,
            "snapshot": "last",
            "db": "postgres",
            "sequence": "[null,\"26183120\"]",
            "schema": "public",
            "table": "test_table",
            "txId": 765,
            "lsn": 26183120,
            "xmin": null
          },
          "op": "r",
          "ts_ms": 1684139539766,
          "transaction": null
        }
      }
      

      AR:

      {
        "payload": {
          "before": null,
          "after": {
            "id": 2,
            "value": 1
          },
          "source": {
            "version": "2.1.2.Final",
            "connector": "postgresql",
            "name": "test_postgres",
            "ts_ms": 1684139539622,
            "snapshot": "last_in_data_collection",
            "db": "postgres",
            "sequence": "[null,\"26183120\"]",
            "schema": "public",
            "table": "test_table",
            "txId": 765,
            "lsn": 26183120,
            "xmin": null
          },
          "op": "r",
          "ts_ms": 1684139539766,
          "transaction": null
        }
      }
      

      Implementation ideas (optional)

      I suggest removing snapshot record check in https://github.com/debezium/debezium/blob/2.1/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java#L485 to be the same as in main branch: https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java#L568

      PR: https://github.com/debezium/debezium/pull/4543

            Unassigned Unassigned
            andrey.pustovetov@gmail.com Andrey Pustovetov
            Votes:
            12 Vote for this issue
            Watchers:
            6 Start watching this issue

              Created:
              Updated:
              Resolved: