Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-516

Task is still running after connector is paused

    XMLWordPrintable

Details

    Description

      I have the task is still running after the connector is paused.

      Kafka Connect runs the task in the following steps:

      @Override
      public void execute() {
          ...
      
              while (!isStopping()) {
                  if (shouldPause()) {
                      onPause();
                      if (awaitUnpause()) {
                          onResume();
                      }
                      continue;
                  }
      
                  if (toSend == null) {
                      log.trace("Nothing to send to Kafka. Polling source for additional records");
                      toSend = task.poll();
                  }
                  if (toSend == null)
                      continue;
                  log.debug("About to send " + toSend.size() + " records to Kafka");
                  if (!sendRecords())
                      stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
              }
      		
          ...
      }
      

      And the task is performing pool() method even after the connector is stopped. This is confirmed by thread dump:

      "pool-1-thread-1" #41 prio=5 os_prio=0 tid=0x00007fc78005e550 nid=0x223 waiting on condition [0x00007fc74b6f9000]
         java.lang.Thread.State: TIMED_WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
              at io.debezium.util.Metronome$2.pause(Metronome.java:89)
              at io.debezium.connector.mysql.AbstractReader.poll(AbstractReader.java:214)
              at io.debezium.connector.mysql.ChainedReader.poll(ChainedReader.java:126)
              at io.debezium.connector.mysql.MySqlConnectorTask.poll(MySqlConnectorTask.java:209)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:163)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      

      WorkerSourceTask doesn't know the connector is paused, it will know this only after leaving of pool() method. But by design pool() method should block if no data is currently available.

      KAFKA-2370: kafka connect pause/resume API

      Attachments

        Activity

          People

            jpechane Jiri Pechanec
            andrey.pustovetov@gmail.com Andrey Pustovetov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: