Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1017

Connector crashes with java.lang.NullPointerException when using multiple sinks to consume the messages

    XMLWordPrintable

Details

    • Hide

      1. setup the sql server source connector with multiple whitelisted tables
      2. setup two mysql sink connectors
      3. see the source connector crash

      Show
      1. setup the sql server source connector with multiple whitelisted tables 2. setup two mysql sink connectors 3. see the source connector crash

    Description

      I'm trying to get the SQL Server connector to work but there seems to be some problems when connecting sinks to the topics produced by the source connector. The source connector crashes with the error message below.

      The error:

      [2018-11-29 15:57:33,288] ERROR Unexpected exception while processing record 'ConsumerRecord(topic = SQLSERVER01.history, partition = 0, offset = 25, CreateTime = 1543506974896, serialized key size = -1, serialized value size = 203, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {
        "source" : {
          "server" : "SQLSERVER01"
        },
        "position" : {
          "snapshot" : true,
          "snapshot_completed" : true
        },
        "databaseName" : "DATABASE01",
        "schemaName" : "dbo",
        "ddl" : "N/A"
      })' (io.debezium.relational.history.KafkaDatabaseHistory)
      java.lang.NullPointerException
              at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92)
              at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238)
              at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73)
              at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43)
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106)
              at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
      java.lang.NullPointerException
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.commit(SqlServerConnectorTask.java:167)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:506)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:447)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:238)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      java.lang.NullPointerException
              at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92)
              at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238)
              at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73)
              at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43)
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106)
              at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
      

      My configuration

      My source connector:

      {
        "name": "sqlserver-source-connector",
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "database.hostname": "host.docker.internal",
        "database.port": "1433",
        "database.user": "dbusername",
        "database.password": "dbpassword",
        "database.dbname": "dbname",
        "database.history.kafka.bootstrap.servers": "kafka1:19092",
        "database.history.kafka.topic": "SQLSERVER01.history",
        "database.server.name": "SQLSERVER01",
        "table.whitelist": "^dbo\\.table1$,^dbo\\.table2$,^dbo\\.table3$"
      }
      

      My sink connectors:

      {
        "name": "table1-sink-connector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "transforms": "route,unwrap",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "tasks.max": "1",
        "topics": "SQLSERVER01.dbo.table1",
        "auto.create": "true",
        "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "PK_table1"
      }
      {
        "name": "table2-sink-connector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "transforms": "route,unwrap",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "tasks.max": "1",
        "topics": "SQLSERVER01.dbo.table2",
        "auto.create": "true",
        "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "PK_table2"
      }
      

      Attachments

        Activity

          People

            jpechane Jiri Pechanec
            edpenglund Victor Englund (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: