Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-519

NPE happened for PAUSED task

    XMLWordPrintable

    Details

    • Steps to Reproduce:
      Hide
      1. Pause the connector
      2. Restart Kafka Connect application
      3. Check logs

      AR: java.lang.NullPointerException happened

      Show
      Pause the connector Restart Kafka Connect application Check logs AR: java.lang.NullPointerException happened

      Description

      I have java.lang.NullPointerException for paused connector after the restart of Kafka Connect application:

      [2017-12-19T08:41:38,237][ERROR][category=org.apache.kafka.connect.runtime.WorkerTask] Task is being killed and will not recover until manually restarted
      [2017-12-19T08:41:38,699][ERROR][category=org.apache.kafka.connect.runtime.WorkerSourceTask] Exception thrown while calling task.commit()
      java.lang.NullPointerException
              at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:154)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:383)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:330)
              at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
              at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:45)
              at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:82)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
              at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
              at java.lang.Thread.run(Thread.java:745)
      

      Root cause:
      WorkerTask instantiates the task, but it doesn’t start the task if the task has PAUSED state:

      org.apache.kafka.connect.runtime.WorkerTask.java

      private void doRun() throws InterruptedException {
          try {
              synchronized (this) {
                  if (stopping)
                      return;
       
                  if (targetState == TargetState.PAUSED) {
                      onPause();
                      if (!awaitUnpause()) return;
                  }
       
                  statusListener.onStartup(id);
              }
       
              execute(); // here is performed the task starting
          } catch (Throwable t) {
              log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
              log.error("Task is being killed and will not recover until manually restarted");
              throw t;
          } finally {
              doClose();
          }
      }
      

      So PostgresConnectorTask doesn’t initialize producer variable and commit() method invocation can throw java.lang.NullPointerException:

      io.debezium.connector.postgresql.PostgresConnectorTask.java

      @Override
      public void commit() throws InterruptedException {
         producer.commit();
      }
      

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                jchipmunk Andrey Pustovetov
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: