Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6105

Cassandra Map Appends are not captured correctly

XMLWordPrintable

    • False
    • None
    • False
    • Important

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      Debezium connector for Cassandra, stand alone version, building from main branch. 

      `debezium-connector-cassandra-3-2.2.0-SNAPSHOT-jar-with-dependencies.jar`

      What is the connector configuration?

      default configurations, 

       

      connector.name=test_connector

      commit.log.relocation.dir=//home/USER/debezium-connector-cassandra/test_dir/relocation/

      http.port=8000

       

      cassandra.config=/etc/cassandra/cassandra.yaml

      cassandra.hosts=127.0.0.1

      cassandra.port=9042

       

      kafka.producer.bootstrap.servers=127.0.0.1:9092

      kafka.producer.retries=3

      kafka.producer.retry.backoff.ms=1000

      topic.prefix=test_prefix

       

       

      key.converter=org.apache.kafka.connect.json.JsonConverter

      value.converter=org.apache.kafka.connect.json.JsonConverter

       

      offset.backing.store.dir=/home/USER/debezium-connector-cassandra/test_dir/

       

      snapshot.consistency=ONE

      snapshot.mode=ALWAYS

       

      latest.commit.log.only=true

      What is the captured database version and mode of depoyment?

      Cassandra on Google Compute Engine

      Software

      Operating SystemDebian (10.12)
      SoftwareCassandra (4.0.5)Google-Fluentd (1.9.8)OpenJDK (11.0.16)Stackdriver-Agent (6.3.0)
      What behaviour do you expect?

      I updated the cassandra-3 integration test to update map column. 

       

      I did a map append and also a map overwrite (set on the map column). I expected the overwrite to be a delete+set message. 

       

       

      // Create table with a map
      context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table")
                      + "(key text, key2 int, des text, distance double, conf map<text, text>, primary key(key, key2)) with cdc = true;");
      
      // Insert into map 
                  Map<String, String> conf = new HashMap<>();
                  conf.put("name" + i, "verma");
                  runCql(insertInto(TEST_KEYSPACE_NAME, "cdc_table")
                          .value("key", literal("key1"))
                          .value("key2", literal(i))
                          .value("conf", literal(conf))
                          .build());            
      
      // Append into MAP
      runCql(update(TEST_KEYSPACE_NAME, "cdc_table").appendMapEntry("conf", literal("state"), literal("nj"))                     .whereColumn("key").isEqualTo(literal("key1"))                     .whereColumn("key2").isEqualTo(literal(i)).build());  
      
      
      // overwrite the map
      Map<String, String> conf2 = new HashMap<>();             conf2.put("overwrite" + i, "replaceMap");            runCql(update(TEST_KEYSPACE_NAME, "cdc_table").setColumn("conf", literal(conf2))                     .whereColumn("key").isEqualTo(literal("key1"))                     .whereColumn("key2").isEqualTo(literal(i)).build());

       

      I verified that Cassandra state is predictable. 

      After insert map contained just the key name, after append it had  [name, state], after overwrite it only had column "overwrite"

       

      What behaviour do you see?

      Both the map append and map overwrite resulted in the same message from DBZ. Essentially, loosing the information that overwrite is Delete+set. If I were to re-create the database state downstream, there will be divergence. Since, Cassandra will do delete+set and downstream database will just do an append. 

      MAP append
      {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"op"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"cluster"},{"type":"string","optional":false,"field":"file"},{"type":"int32","optional":false,"field":"pos"},{"type":"string","optional":false,"field":"keyspace"},{"type":"string","optional":false,"field":"table"}],"optional":false,"name":"source","field":"source"},{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key2"},{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":true},"values":{"type":"string","optional":true},"optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"conf"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"des"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"distance"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_start","version":1,"field":"_range_start"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_end","version":1,"field":"_range_end"}],"optional":false,"name":"after","version":1,"field":"after"}],"optional":false,"name":"io.debezium.connector.cassandra.test_topic.test_keyspace.cdc_table.Envelope"},"payload":{"ts_ms":1675457905454,"op":"u","source":{"version":"2.2.0-SNAPSHOT","connector":"cassandra","name":"test_topic","ts_ms":1675457903156,"snapshot":"false","db":"NULL","sequence":null,"cluster":"Test
      Cluster","file":"CommitLog-6-1675457865052.log","pos":315963,"keyspace":"test_keyspace","table":"cdc_table"},"after":{"key":{"value":"key1","deletion_ts":null,"set":true},"key2":{"value":2,"deletion_ts":null,"set":true},"conf":{"value":{"state":"nj"},"deletion_ts":null,"set":true},"des":null,"distance":null,"_range_start":null,"_range_end":null}}}
      Map Insert
      {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"op"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"cluster"},{"type":"string","optional":false,"field":"file"},{"type":"int32","optional":false,"field":"pos"},{"type":"string","optional":false,"field":"keyspace"},{"type":"string","optional":false,"field":"table"}],"optional":false,"name":"source","field":"source"},{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key2"},{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":true},"values":{"type":"string","optional":true},"optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"conf"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"des"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"distance"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_start","version":1,"field":"_range_start"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_end","version":1,"field":"_range_end"}],"optional":false,"name":"after","version":1,"field":"after"}],"optional":false,"name":"io.debezium.connector.cassandra.test_topic.test_keyspace.cdc_table.Envelope"},"payload":{"ts_ms":1675721893371,"op":"i","source":{"version":"2.2.0-SNAPSHOT","connector":"cassandra","name":"test_topic","ts_ms":1675721886933,"snapshot":"false","db":"NULL","sequence":null,"cluster":"Test Cluster","file":"CommitLog-6-1675721847082.log","pos":315270,"keyspace":"test_keyspace","table":"cdc_table"},"after":{"key":{"value":"key1","deletion_ts":null,"set":true},"key2":{"value":2,"deletion_ts":null,"set":true},"conf":{"value":{"name2":"verma"},"deletion_ts":null,"set":true},"des":null,"distance":null,"_range_start":null,"_range_end":null}}}Map Append
      {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"op"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"cluster"},{"type":"string","optional":false,"field":"file"},{"type":"int32","optional":false,"field":"pos"},{"type":"string","optional":false,"field":"keyspace"},{"type":"string","optional":false,"field":"table"}],"optional":false,"name":"source","field":"source"},{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key2"},{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":true},"values":{"type":"string","optional":true},"optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"conf"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"des"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"distance"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_start","version":1,"field":"_range_start"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_end","version":1,"field":"_range_end"}],"optional":false,"name":"after","version":1,"field":"after"}],"optional":false,"name":"io.debezium.connector.cassandra.test_topic.test_keyspace.cdc_table.Envelope"},"payload":{"ts_ms":1675721893372,"op":"u","source":{"version":"2.2.0-SNAPSHOT","connector":"cassandra","name":"test_topic","ts_ms":1675721888994,"snapshot":"false","db":"NULL","sequence":null,"cluster":"Test Cluster","file":"CommitLog-6-1675721847082.log","pos":315348,"keyspace":"test_keyspace","table":"cdc_table"},"after":{"key":{"value":"key1","deletion_ts":null,"set":true},"key2":{"value":2,"deletion_ts":null,"set":true},"conf":{"value":{"state":"nj"},"deletion_ts":null,"set":true},"des":null,"distance":null,"_range_start":null,"_range_end":null}}} 

      I was able to verify that Cassandra does send different messages.

      Initial insert has "del(conf)"
      Row[info=[ts=1676047321827355] ]: key2=0 | del(conf)=deletedAt=1676047321827354, localDeletion=1676047321, [conf[name0]=verma ts=1676047321827355]
      Append MAP does not have "del(conf)"
      Row[info=[ts=-9223372036854775808] ]: key2=0 | , [conf[state]=nj ts=1676047323904228]]
      Overwrite has "del(conf)"
      Row[info=[ts=-9223372036854775808] ]: key2=0 | del(conf)=deletedAt=1676047325996875, localDeletion=1676047325, [conf[overwrite0]=replaceMap ts=1676047325996876]
        

      Do you see the same behaviour using the latest relesead Debezium version?

      Building from github latest version. 

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      <Your answer>

      How to reproduce the issue using our tutorial deployment?

      Use the code I pasted in a integ test and verify messages. Overwrite should be a delete+set message. 

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      NOt sure. Cassandra is giving hints on delete+set, we just need to handle it properly. 

            Unassigned Unassigned
            vermas2012 shitanshu verma (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: