Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5654

Outbox pattern nested payload leads to connector crash

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 1.9.7.Final, 2.0.0.CR1
    • 1.9.6.Final, 2.0.0.Beta2
    • core-library
    • None
    • False
    • None
    • False

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-postgres/2.0.0.Beta2
      debezium-connector-postgres/1.9.6.Final

      What is the connector configuration?

      {
      "name": "postgres-outbox",
      "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.dbname": "postgres",
      "database.history.kafka.topic": "schema-changes.postgres_outbox",
      "database.hostname": "postgres",
      "database.password": "postgres",
      "database.port": "5432",
      "database.server.name": "postgres_example",
      "database.serverTimezone": "Europe/Paris",
      "database.user": "postgres",
      "decimal.handling.mode": "double",
      "errors.log.enable": "true",
      "errors.log.include.message": "true",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "plugin.name": "pgoutput",
      "slot.name": "postgres",
      "snapshot.locking.mode": "none",
      "snapshot.mode": "initial",
      "snapshot.select.statement.overrides": "public.outbox",
      "snapshot.select.statement.overrides.public.outbox": "select id, aggregatetype, aggregateid, payload, type, created_at from(select *, rank() over(partition by aggregatetype, aggregateid order by created_at desc) as rnk from public.outbox)a where rnk=1;",
      "table.include.list": "public.outbox",
      "tasks.max": "1",
      "topic.creation.default.partitions": "1",
      "topic.creation.default.replication.factor": "-1",
      "topic.creation.enabled": "true",
      "transforms": "outbox",
      "transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
      "transforms.outbox.table.expand.json.payload": "true",
      "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
      "transforms.outbox.table.fields.additional.placement": "type:header:type,type:envelope:type",
      "transforms.outbox.table.field.event.key": "aggregateid",
      "value.converter.delegate.converter.type": "io.debezium.converters.ByteBufferConverter",
      "value.converter.delegate.converter.type.schema.registry.url": "http://schema-registry:8081",
      "value.converter.delegate.converter.type.schemas.enable": "false"

      }

      }

      What is the captured database version and mode of deployment?

       

      root@postgres:/# postgres -V
      postgres (PostgreSQL) 14.5 (Debian 14.5-1.pgdg110+1)

      Used through docker-compose

      Base image postgres latest : https://hub.docker.com/layers/library/postgres/latest/images/sha256-ea382336c7382562e0e2fe8d21c42ad9a2852631e95ee6da14962955ba01cfba?context=explore

      What behaviour do you expect?

      If I insert a nested JSONB content (the payload contains at least a value being a dict, or a list of dict) in the payload column, the connector should not fail and it should produce a parsed Avro schema.

      What behaviour do you see?

      If I insert a nested JSONB content (the payload contains at least a value being a dict, or a list of dict) in the payload column, the connector fails.

      Do you see the same behaviour using the latest released Debezium version?

      Yes, as per specified above

      Do you have the connector logs, ideally from start till finish?

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic outbox.event.user :
          at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
          at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
          ... 11 more
      Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
      Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault
          at org.apache.avro.Schema$Names.put(Schema.java:1542)
          at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:805)
          at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:967)
          at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1234)
          at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1129)
          at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1234)
          at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:995)
          at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:979)
          at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1234)
          at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:995)
          at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:979)
          at org.apache.avro.Schema.toString(Schema.java:419)
          at org.apache.avro.Schema.toString(Schema.java:410)
          at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:151)
          at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:213)
          at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:275)
          at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:251)
          at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:103)
          at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
          at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
          at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)

      How to reproduce the issue?

      Create the following outbox table in the public postgresql schema :

      CREATE TABLE IF NOT EXISTS outbox (
      id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
      aggregatetype VARCHAR(255) NOT NULL,
      aggregateid VARCHAR(255) NOT NULL,
      payload JSONB,
      type VARCHAR(255) NOT NULL,
      created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT(NOW() AT TIME ZONE('utc'))
      );

      Try to insert nested data in the payload field :

      INSERT INTO outbox(aggregatetype, aggregateid, type, payload)
      SELECT 
         'user' AS aggregatetype
         , a AS aggregateid
         , 'pending' AS type
         , JSONB_BUILD_OBJECT('item', JSONB_BUILD_OBJECT('price', b)) AS payload
      FROM(
      SELECT(
      SELECT (random()+f/1e39)::int) AS a,
      (SELECT random()+f/1e39) AS b 
      FROM generate_series(1,3) f(f)
      ) foo;

      Deploy the connector with the config above

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      Leverage outbox pattern with nested payload with Avro serialization and delegation of converter to have a detailed Avro schema registered (ie : not a global io.debezium.data.Json field)

            Unassigned Unassigned
            hugo.epicier Hugo Epicier (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: