Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1171

RecordsStreamProducer fails on DataException for null value

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Closed (View Workflow)
    • Priority: Major
    • Resolution: Explained
    • Affects Version/s: 0.9.2.Final
    • Fix Version/s: None
    • Component/s: postgresql-connector
    • Labels:
      None

      Description

      Environment

      Debezium Version: 0.9.2.Final
      Plugin: wal2json
      Connector: Postgresql
      Environment: Amazon RDS

      Summary

      We are encountering a DataException when Debezium is processing changes from a data migration.

      Details

      When Debezium is restarted after a data migration we are getting the following stacktrace:

      Mar 05 12:47:38.929	2019-03-05 11:47:38,833 DEBUG || [Consumer clientId=consumer-2, groupId=debezium-kafka-connect-acc] Node 1 sent a full fetch response with 1 response partition(s) [org.apache.kafka.clients.FetchSessionHandler]
      Mar 05 12:47:38.631	at java.lang.Thread.run(Thread.java:748)
      Mar 05 12:47:38.631	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      Mar 05 12:47:38.631	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      Mar 05 12:47:38.631	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      Mar 05 12:47:38.631	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:119)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:133)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:254)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:269)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder.processMessage(NonStreamingWal2JsonMessageDecoder.java:62)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:133)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:264)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:352)
      Mar 05 12:47:38.630	at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:124)
      Mar 05 12:47:38.630	java.lang.InterruptedException
      Mar 05 12:47:38.630	2019-03-05 11:47:38,558 ERROR Postgres|audit|records-stream-producer unexpected exception while streaming logical changes [io.debezium.connector.postgresql.RecordsStreamProducer]
      Mar 05 12:47:38.630	at java.lang.Thread.run(Thread.java:748)
      Mar 05 12:47:38.630	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      Mar 05 12:47:38.630	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      Mar 05 12:47:38.630	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      Mar 05 12:47:38.630	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:119)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:133)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:254)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:269)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder.processMessage(NonStreamingWal2JsonMessageDecoder.java:62)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:133)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:264)
      Mar 05 12:47:38.630	at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:325)
      Mar 05 12:47:38.630	at io.debezium.relational.TableSchema.keyFromColumnData(TableSchema.java:124)
      Mar 05 12:47:38.630	at io.debezium.relational.TableSchemaBuilder.lambda$createKeyGenerator$1(TableSchemaBuilder.java:174)
      Mar 05 12:47:38.629	at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
      Mar 05 12:47:38.629	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
      Mar 05 12:47:38.629	org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "certificate_id", schema type: BYTES
      

      Surrounding logs:

      2019-03-05 11:47:38,557 ERROR  Postgres|audit|records-stream-producer  Failed to properly convert key value for 'platform_felix.certificate_eligiblecountries.certificate_id' of type numeric for row [AL, null]:   
      2019-03-05 11:47:38,549 DEBUG  Postgres|audit|records-stream-producer  Mapped columns for table 'platform_felix.certificate_eligiblecountries' to schema: {"name" : "audit.platform_felix.certificate_eligiblecountries.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "eligible_country", "index" : "0", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "certificate_id", "index" : "1", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}]}   [io.debezium.relational.TableSchemaBuilder]
      2019-03-05 11:47:38,548 DEBUG  Postgres|audit|records-stream-producer  Mapped primary key for table 'platform_felix.certificate_eligiblecountries' to schema: {"name" : "audit.platform_felix.certificate_eligiblecountries.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "eligible_country", "index" : "0", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "certificate_id", "index" : "1", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}]}   [io.debezium.relational.TableSchemaBuilder]
      2019-03-05 11:47:38,548 DEBUG  Postgres|audit|records-stream-producer  - field 'certificate_id' (BYTES) from column certificate_id numeric(19, 0) NOT NULL   [io.debezium.relational.TableSchemaBuilder]
      2019-03-05 11:47:38,548 DEBUG  Postgres|audit|records-stream-producer  - field 'certificate_id' (BYTES) from column certificate_id numeric(19, 0) NOT NULL   [io.debezium.relational.TableSchemaBuilder]
      2019-03-05 11:47:38,546 DEBUG  Postgres|audit|records-stream-producer  - field 'eligible_country' (STRING) from column eligible_country varchar(2, 0) NOT NULL   [io.debezium.relational.TableSchemaBuilder]
      2019-03-05 11:47:38,546 DEBUG  Postgres|audit|records-stream-producer  - field 'eligible_country' (STRING) from column eligible_country varchar(2, 0) NOT NULL   [io.debezium.relational.TableSchemaBuilder]
      2019-03-05 11:47:38,545 DEBUG  Postgres|audit|records-stream-producer  Mapping table 'platform_felix.certificate_eligiblecountries' to schemas under 'audit.platform_felix.certificate_eligiblecountries'   [io.debezium.relational.TableSchemaBuilder]
      

      Specifically the following data migration seems to cause issues:
      1. Add the column 'certificate_id'
      2. Fill column 'certificate_id'
      3. Include 'certificate_id' in the primary key

      The data migration is done while Debezium is down to make sure we don't run into the known limitation described at https://debezium.io/docs/connectors/postgresql/#streaming-changes

      Our deployment process includes the following steps:
      1. Place the application in read-only mode
      2. Stop Debezium
      3. Deploy a new version of the application (which includes automatic data migration)
      4. Start Debezium

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                jpechanec Jiri Pechanec
                Reporter:
                wouter.bancken.aca Wouter Bancken
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: