Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1017

Connector crashes with java.lang.NullPointerException when using multiple sinks to consume the messages

    Details

    • Type: Bug
    • Status: Closed (View Workflow)
    • Priority: Major
    • Resolution: Done
    • Affects Version/s: 0.9.0.Beta1
    • Fix Version/s: 0.9.0.Beta2
    • Component/s: sqlserver-connector
    • Labels:
      None
    • Environment:

      OS: Windows 10 Enterprise
      SQL Server edition: Microsoft SQL Server Developer 14.0.1000.169
      Sink connector: mysql-connector-java-8.0.13.jar

    • Steps to Reproduce:
      Hide

      1. setup the sql server source connector with multiple whitelisted tables
      2. setup two mysql sink connectors
      3. see the source connector crash

      Show
      1. setup the sql server source connector with multiple whitelisted tables 2. setup two mysql sink connectors 3. see the source connector crash

      Description

      I'm trying to get the SQL Server connector to work but there seems to be some problems when connecting sinks to the topics produced by the source connector. The source connector crashes with the error message below.

      The error:

      [2018-11-29 15:57:33,288] ERROR Unexpected exception while processing record 'ConsumerRecord(topic = SQLSERVER01.history, partition = 0, offset = 25, CreateTime = 1543506974896, serialized key size = -1, serialized value size = 203, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {
        "source" : {
          "server" : "SQLSERVER01"
        },
        "position" : {
          "snapshot" : true,
          "snapshot_completed" : true
        },
        "databaseName" : "DATABASE01",
        "schemaName" : "dbo",
        "ddl" : "N/A"
      })' (io.debezium.relational.history.KafkaDatabaseHistory)
      java.lang.NullPointerException
              at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92)
              at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238)
              at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73)
              at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43)
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106)
              at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2018-11-29 15:57:33,291] INFO WorkerSourceTask{id=sqlserver-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Exception thrown while calling task.commit() (org.apache.kafka.connect.runtime.WorkerSourceTask)
      java.lang.NullPointerException
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.commit(SqlServerConnectorTask.java:167)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:506)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:447)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:238)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      java.lang.NullPointerException
              at io.debezium.relational.history.AbstractDatabaseHistory.lambda$recover$1(AbstractDatabaseHistory.java:92)
              at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:238)
              at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:73)
              at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:43)
              at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:106)
              at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      [2018-11-29 15:57:33,292] ERROR WorkerSourceTask{id=sqlserver-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
      

      My configuration

      My source connector:

      {
        "name": "sqlserver-source-connector",
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "database.hostname": "host.docker.internal",
        "database.port": "1433",
        "database.user": "dbusername",
        "database.password": "dbpassword",
        "database.dbname": "dbname",
        "database.history.kafka.bootstrap.servers": "kafka1:19092",
        "database.history.kafka.topic": "SQLSERVER01.history",
        "database.server.name": "SQLSERVER01",
        "table.whitelist": "^dbo\\.table1$,^dbo\\.table2$,^dbo\\.table3$"
      }
      

      My sink connectors:

      {
        "name": "table1-sink-connector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "transforms": "route,unwrap",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "tasks.max": "1",
        "topics": "SQLSERVER01.dbo.table1",
        "auto.create": "true",
        "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "PK_table1"
      }
      {
        "name": "table2-sink-connector",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
        "transforms": "route,unwrap",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "tasks.max": "1",
        "topics": "SQLSERVER01.dbo.table2",
        "auto.create": "true",
        "connection.url": "jdbc:mysql://dbservice.db/dbname?user=dbuser&password=r4ndom",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "PK_table2"
      }
      

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                jpechanec Jiri Pechanec
                Reporter:
                edpenglund Victor Englund
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: