Details
-
Bug
-
Resolution: Not a Bug
-
Major
-
None
-
0.9.2.Final
-
None
Description
Environment
Debezium Version: 0.9.2.Final
Plugin: wal2json
Connector: Postgresql
Environment: Amazon RDS
Summary
We are encountering a DataException when Debezium is processing changes from a data migration.
Details
When Debezium is restarted after a data migration we are getting the following stacktrace:
Mar 05 12:47:38.929 2019-03-05 11:47:38,833 DEBUG || [Consumer clientId=consumer-2, groupId=debezium-kafka-connect-acc] Node 1 sent a full fetch response with 1 response partition(s) [org.apache.kafka.clients.FetchSessionHandler] Mar 05 12:47:38.631 at java.lang.Thread.run(Thread.java:748) Mar 05 12:47:38.631 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) Mar 05 12:47:38.631 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) Mar 05 12:47:38.631 at java.util.concurrent.FutureTask.run(FutureTask.java:266) Mar 05 12:47:38.631 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:119) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:133) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:254) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:269) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder.processMessage(NonStreamingWal2JsonMessageDecoder.java:62) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:133) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:264) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:352) Mar 05 12:47:38.630 at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:124) Mar 05 12:47:38.630 java.lang.InterruptedException Mar 05 12:47:38.630 2019-03-05 11:47:38,558 ERROR Postgres|audit|records-stream-producer unexpected exception while streaming logical changes [io.debezium.connector.postgresql.RecordsStreamProducer] Mar 05 12:47:38.630 at java.lang.Thread.run(Thread.java:748) Mar 05 12:47:38.630 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) Mar 05 12:47:38.630 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) Mar 05 12:47:38.630 at java.util.concurrent.FutureTask.run(FutureTask.java:266) Mar 05 12:47:38.630 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:119) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:133) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:254) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:269) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder.processMessage(NonStreamingWal2JsonMessageDecoder.java:62) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:133) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:264) Mar 05 12:47:38.630 at io.debezium.connector.postgresql.RecordsStreamProducer.generateUpdateRecord(RecordsStreamProducer.java:325) Mar 05 12:47:38.630 at io.debezium.relational.TableSchema.keyFromColumnData(TableSchema.java:124) Mar 05 12:47:38.630 at io.debezium.relational.TableSchemaBuilder.lambda$createKeyGenerator$1(TableSchemaBuilder.java:174) Mar 05 12:47:38.629 at org.apache.kafka.connect.data.Struct.put(Struct.java:216) Mar 05 12:47:38.629 at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220) Mar 05 12:47:38.629 org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "certificate_id", schema type: BYTES
Surrounding logs:
2019-03-05 11:47:38,557 ERROR Postgres|audit|records-stream-producer Failed to properly convert key value for 'platform_felix.certificate_eligiblecountries.certificate_id' of type numeric for row [AL, null]: 2019-03-05 11:47:38,549 DEBUG Postgres|audit|records-stream-producer Mapped columns for table 'platform_felix.certificate_eligiblecountries' to schema: {"name" : "audit.platform_felix.certificate_eligiblecountries.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "eligible_country", "index" : "0", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "certificate_id", "index" : "1", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}]} [io.debezium.relational.TableSchemaBuilder] 2019-03-05 11:47:38,548 DEBUG Postgres|audit|records-stream-producer Mapped primary key for table 'platform_felix.certificate_eligiblecountries' to schema: {"name" : "audit.platform_felix.certificate_eligiblecountries.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "eligible_country", "index" : "0", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "certificate_id", "index" : "1", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "false", "version" : "1"}}]} [io.debezium.relational.TableSchemaBuilder] 2019-03-05 11:47:38,548 DEBUG Postgres|audit|records-stream-producer - field 'certificate_id' (BYTES) from column certificate_id numeric(19, 0) NOT NULL [io.debezium.relational.TableSchemaBuilder] 2019-03-05 11:47:38,548 DEBUG Postgres|audit|records-stream-producer - field 'certificate_id' (BYTES) from column certificate_id numeric(19, 0) NOT NULL [io.debezium.relational.TableSchemaBuilder] 2019-03-05 11:47:38,546 DEBUG Postgres|audit|records-stream-producer - field 'eligible_country' (STRING) from column eligible_country varchar(2, 0) NOT NULL [io.debezium.relational.TableSchemaBuilder] 2019-03-05 11:47:38,546 DEBUG Postgres|audit|records-stream-producer - field 'eligible_country' (STRING) from column eligible_country varchar(2, 0) NOT NULL [io.debezium.relational.TableSchemaBuilder] 2019-03-05 11:47:38,545 DEBUG Postgres|audit|records-stream-producer Mapping table 'platform_felix.certificate_eligiblecountries' to schemas under 'audit.platform_felix.certificate_eligiblecountries' [io.debezium.relational.TableSchemaBuilder]
Specifically the following data migration seems to cause issues:
1. Add the column 'certificate_id'
2. Fill column 'certificate_id'
3. Include 'certificate_id' in the primary key
The data migration is done while Debezium is down to make sure we don't run into the known limitation described at https://debezium.io/docs/connectors/postgresql/#streaming-changes
Our deployment process includes the following steps:
1. Place the application in read-only mode
2. Stop Debezium
3. Deploy a new version of the application (which includes automatic data migration)
4. Start Debezium