Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7547

ExtractNewRecordState NPE for heartbeats

    XMLWordPrintable

Details

    • Task
    • Resolution: Unresolved
    • Major
    • 2.7.0.Alpha2
    • 2.5.1.Final
    • core-library
    • None
    • False
    • None
    • False

    Description

      I've got a PostgreSQL 2.5.1.Final Connector (I assume this can happen across all Connectors though) throwing an NPE when:

      • The ExtractNewRecordState SMT is configured, and db or source.db are added as headers (e.g. transforms.*.add.headers)
      • Heartbeats are enabled
      {{2024-02-21 15:28:09 [2024-02-21 15:28:09,969] ERROR Error encountered in task STR2606_Sr_1-0. Executing stage 'TRANSFORMATION' with class 'io.debezium.transforms.ExtractNewRecordState', where source record is = SourceRecord{sourcePartition=
      {server=STR2606_SR_1}
      , sourceOffset={transaction_id=null, lsn=26969176, txId=751, ts_usec=1708527334156439}} ConnectRecord{topic='__debezium-heartbeat.STR2606_SR_1', kafkaPartition=0, key=Struct
      {serverName=STR2606_SR_1}
      , keySchema=Schema
      {io.debezium.connector.common.ServerNameKey:STRUCT}
      , value=Struct
      {ts_ms=1708529289699}
      , valueSchema=Schema
      {io.debezium.connector.common.Heartbeat:STRUCT}
      , timestamp=null, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter)java.lang.NullPointerExceptionat io.debezium.transforms.AbstractExtractNewRecordState$FieldReference.getField(AbstractExtractNewRecordState.java:273)at io.debezium.transforms.AbstractExtractNewRecordState$FieldReference.getSchema(AbstractExtractNewRecordState.java:266)at io.debezium.transforms.AbstractExtractNewRecordState.makeHeaders(AbstractExtractNewRecordState.java:152)at io.debezium.transforms.ExtractNewRecordState.doApply(ExtractNewRecordState.java:107)at io.debezium.transforms.AbstractExtractNewRecordState.apply(AbstractExtractNewRecordState.java:109)at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)}} 

      Docs for transforms.*.add.headers say:

      If you specify a field that is not in the change event record, the SMT does not add the field to the header.

      That's true for fields like op or source.ts_ms, but I can consistently reproduce the NPE by adding db or source.db to add.headers.

      A workaround is to apply a Predicate to the ExtractNewRecordState SMT that ignores Heartbeat topic messages. Example:

      ...
      "transforms": "unwrap",
      "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
      "transforms.unwrap.add.headers": "op,db",
      ...
      "transforms.unwrap.predicate": "IgnoreHeartbeatStateExtraction",
      "transforms.unwrap.negate": "true",
      ...
      "predicates": "IgnoreHeartbeatStateExtraction",
      "predicates.IgnoreHeartbeatStateExtraction.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
      "predicates.IgnoreHeartbeatStateExtraction.pattern": ".*-heartbeat.*",
      ...

      Attachments

        Activity

          People

            Unassigned Unassigned
            jonathono Jonathon Ogden
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: