Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1024

Postgres plugin does not signal the end of snapshot properly

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Closed (View Workflow)
    • Priority: Major
    • Resolution: Done
    • Affects Version/s: 0.8.3.Final
    • Fix Version/s: 0.9.0.Beta2
    • Component/s: postgresql-connector
    • Labels:
      None
    • Environment:

      Linux RHEL7, postgres 10.3

    • Steps to Reproduce:
      Hide
      • Have a postgres table with a primary key and two rows inside
      • Use the embedded engine (as in https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/)
      • In the code, change the Kinesis sender to instead simply log the content of the event
      • start the embedded engine and collect the log results. You will get something similar to:
        {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}}
        {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}}
        

      While the correct output should be (difference is on the second record, field value.source.last_snapshot_record):

      {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}}
      {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":true},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}}
      
      Show
      Have a postgres table with a primary key and two rows inside Use the embedded engine (as in https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/ ) In the code, change the Kinesis sender to instead simply log the content of the event start the embedded engine and collect the log results. You will get something similar to: {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}} {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}} While the correct output should be (difference is on the second record, field value.source.last_snapshot_record): {"value":{"before":null,"after":{"tablekey":0,"value":100},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042374043000,"txId":559,"lsn":23350176,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1544042374044},"key":{"tablekey":0}} {"value":{"before":null,"after":{"tablekey":1,"value":1001},"source":{"version":"${project.version}","name":"localhost:46642/postgres","db":"postgres","ts_usec":1544042382218023,"txId":560,"lsn":23350312,"schema":"public","table":"testtable","snapshot":true,"last_snapshot_record":true},"op":"c","ts_ms":1544042382283},"key":{"tablekey":1}}

      Description

      When streaming data from a Postgres DB, and a table that already has data in, the last record from the snapshot does not have the right value for the "last_snapshot_record" field: it is false instead of true.

      I believe the issue to be in io/debezium/connector/postgresql/RecordsSnapshotProducer.java, I have a small patch solving it but it is not very nice.

      --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java
      +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java
      @@ -206,11 +206,21 @@ public class RecordsSnapshotProducer extends RecordsProducer {
                       // process and send the last record after marking it as such
                       logger.info("Step 5: sending the last snapshot record");
                       sourceInfo.markLastSnapshotRecord();
      
      +                final Struct oldValue = (Struct)currentRecord.value();
      +                final Struct newValue = new Struct(oldValue.schema());
      +                newValue.put(Envelope.FieldName.OPERATION, oldValue.get( Envelope.FieldName.OPERATION ));
      +                newValue.put(Envelope.FieldName.AFTER, oldValue.get( Envelope.FieldName.AFTER ));
      +                newValue.put(Envelope.FieldName.SOURCE, sourceInfo.source());
      +                newValue.put(Envelope.FieldName.TIMESTAMP, oldValue.get(Envelope.FieldName.TIMESTAMP));
      +
                       this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), sourceInfo.offset(),
                                                               currentRecord.topic(), currentRecord.kafkaPartition(),
                                                               currentRecord.keySchema(), currentRecord.key(),
      -                                                        currentRecord.valueSchema(), currentRecord.value()));
      -
      +                                                        currentRecord.valueSchema(), newValue));
                       sendCurrentRecord(consumer);
                   }
      

        Gliffy Diagrams

          Attachments

            Issue Links

              Activity

                People

                • Assignee:
                  jpechanec Jiri Pechanec
                  Reporter:
                  daniel.fredouille Daniel Fredouille
                • Votes:
                  0 Vote for this issue
                  Watchers:
                  3 Start watching this issue

                  Dates

                  • Created:
                    Updated:
                    Resolved: