-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
-
False
-
None
-
False
-
Important
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
Debezium connector for Cassandra, stand alone version, building from main branch.
`debezium-connector-cassandra-3-2.2.0-SNAPSHOT-jar-with-dependencies.jar`
What is the connector configuration?
default configurations,
connector.name=test_connector
commit.log.relocation.dir=//home/USER/debezium-connector-cassandra/test_dir/relocation/
http.port=8000
cassandra.config=/etc/cassandra/cassandra.yaml
cassandra.hosts=127.0.0.1
cassandra.port=9042
kafka.producer.bootstrap.servers=127.0.0.1:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
topic.prefix=test_prefix
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.backing.store.dir=/home/USER/debezium-connector-cassandra/test_dir/
snapshot.consistency=ONE
snapshot.mode=ALWAYS
latest.commit.log.only=true
What is the captured database version and mode of depoyment?
Cassandra on Google Compute Engine
Software
Operating SystemDebian (10.12)
SoftwareCassandra (4.0.5)Google-Fluentd (1.9.8)OpenJDK (11.0.16)Stackdriver-Agent (6.3.0)
What behaviour do you expect?
I updated the cassandra-3 integration test to update map column.
I did a map append and also a map overwrite (set on the map column). I expected the overwrite to be a delete+set message.
// Create table with a map context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + "(key text, key2 int, des text, distance double, conf map<text, text>, primary key(key, key2)) with cdc = true;"); // Insert into map Map<String, String> conf = new HashMap<>(); conf.put("name" + i, "verma"); runCql(insertInto(TEST_KEYSPACE_NAME, "cdc_table") .value("key", literal("key1")) .value("key2", literal(i)) .value("conf", literal(conf)) .build()); // Append into MAP runCql(update(TEST_KEYSPACE_NAME, "cdc_table").appendMapEntry("conf", literal("state"), literal("nj")) .whereColumn("key").isEqualTo(literal("key1")) .whereColumn("key2").isEqualTo(literal(i)).build()); // overwrite the map Map<String, String> conf2 = new HashMap<>(); conf2.put("overwrite" + i, "replaceMap"); runCql(update(TEST_KEYSPACE_NAME, "cdc_table").setColumn("conf", literal(conf2)) .whereColumn("key").isEqualTo(literal("key1")) .whereColumn("key2").isEqualTo(literal(i)).build());
I verified that Cassandra state is predictable.
After insert map contained just the key name, after append it had [name, state], after overwrite it only had column "overwrite"
What behaviour do you see?
Both the map append and map overwrite resulted in the same message from DBZ. Essentially, loosing the information that overwrite is Delete+set. If I were to re-create the database state downstream, there will be divergence. Since, Cassandra will do delete+set and downstream database will just do an append.
MAP append {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"op"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"cluster"},{"type":"string","optional":false,"field":"file"},{"type":"int32","optional":false,"field":"pos"},{"type":"string","optional":false,"field":"keyspace"},{"type":"string","optional":false,"field":"table"}],"optional":false,"name":"source","field":"source"},{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key2"},{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":true},"values":{"type":"string","optional":true},"optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"conf"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"des"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"distance"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_start","version":1,"field":"_range_start"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_end","version":1,"field":"_range_end"}],"optional":false,"name":"after","version":1,"field":"after"}],"optional":false,"name":"io.debezium.connector.cassandra.test_topic.test_keyspace.cdc_table.Envelope"},"payload":{"ts_ms":1675457905454,"op":"u","source":{"version":"2.2.0-SNAPSHOT","connector":"cassandra","name":"test_topic","ts_ms":1675457903156,"snapshot":"false","db":"NULL","sequence":null,"cluster":"Test Cluster","file":"CommitLog-6-1675457865052.log","pos":315963,"keyspace":"test_keyspace","table":"cdc_table"},"after":{"key":{"value":"key1","deletion_ts":null,"set":true},"key2":{"value":2,"deletion_ts":null,"set":true},"conf":{"value":{"state":"nj"},"deletion_ts":null,"set":true},"des":null,"distance":null,"_range_start":null,"_range_end":null}}} Map Insert {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"op"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"cluster"},{"type":"string","optional":false,"field":"file"},{"type":"int32","optional":false,"field":"pos"},{"type":"string","optional":false,"field":"keyspace"},{"type":"string","optional":false,"field":"table"}],"optional":false,"name":"source","field":"source"},{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key2"},{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":true},"values":{"type":"string","optional":true},"optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"conf"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"des"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"distance"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_start","version":1,"field":"_range_start"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_end","version":1,"field":"_range_end"}],"optional":false,"name":"after","version":1,"field":"after"}],"optional":false,"name":"io.debezium.connector.cassandra.test_topic.test_keyspace.cdc_table.Envelope"},"payload":{"ts_ms":1675721893371,"op":"i","source":{"version":"2.2.0-SNAPSHOT","connector":"cassandra","name":"test_topic","ts_ms":1675721886933,"snapshot":"false","db":"NULL","sequence":null,"cluster":"Test Cluster","file":"CommitLog-6-1675721847082.log","pos":315270,"keyspace":"test_keyspace","table":"cdc_table"},"after":{"key":{"value":"key1","deletion_ts":null,"set":true},"key2":{"value":2,"deletion_ts":null,"set":true},"conf":{"value":{"name2":"verma"},"deletion_ts":null,"set":true},"des":null,"distance":null,"_range_start":null,"_range_end":null}}}Map Append {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":false,"field":"op"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"cluster"},{"type":"string","optional":false,"field":"file"},{"type":"int32","optional":false,"field":"pos"},{"type":"string","optional":false,"field":"keyspace"},{"type":"string","optional":false,"field":"table"}],"optional":false,"name":"source","field":"source"},{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"key2"},{"type":"struct","fields":[{"type":"map","keys":{"type":"string","optional":true},"values":{"type":"string","optional":true},"optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"conf"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"des"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"value"},{"type":"int64","optional":true,"field":"deletion_ts"},{"type":"boolean","optional":false,"field":"set"}],"optional":true,"name":"cell_value","version":1,"field":"distance"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_start","version":1,"field":"_range_start"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"method"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"value"},{"type":"string","optional":false,"field":"type"}],"optional":false,"name":"clustering_value","version":1},"optional":false,"name":"clustering_values","version":1,"field":"values"}],"optional":true,"name":"_range_end","version":1,"field":"_range_end"}],"optional":false,"name":"after","version":1,"field":"after"}],"optional":false,"name":"io.debezium.connector.cassandra.test_topic.test_keyspace.cdc_table.Envelope"},"payload":{"ts_ms":1675721893372,"op":"u","source":{"version":"2.2.0-SNAPSHOT","connector":"cassandra","name":"test_topic","ts_ms":1675721888994,"snapshot":"false","db":"NULL","sequence":null,"cluster":"Test Cluster","file":"CommitLog-6-1675721847082.log","pos":315348,"keyspace":"test_keyspace","table":"cdc_table"},"after":{"key":{"value":"key1","deletion_ts":null,"set":true},"key2":{"value":2,"deletion_ts":null,"set":true},"conf":{"value":{"state":"nj"},"deletion_ts":null,"set":true},"des":null,"distance":null,"_range_start":null,"_range_end":null}}}
I was able to verify that Cassandra does send different messages.
Initial insert has "del(conf)" Row[info=[ts=1676047321827355] ]: key2=0 | del(conf)=deletedAt=1676047321827354, localDeletion=1676047321, [conf[name0]=verma ts=1676047321827355] Append MAP does not have "del(conf)" Row[info=[ts=-9223372036854775808] ]: key2=0 | , [conf[state]=nj ts=1676047323904228]] Overwrite has "del(conf)" Row[info=[ts=-9223372036854775808] ]: key2=0 | del(conf)=deletedAt=1676047325996875, localDeletion=1676047325, [conf[overwrite0]=replaceMap ts=1676047325996876]
Do you see the same behaviour using the latest relesead Debezium version?
Building from github latest version.
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
<Your answer>
How to reproduce the issue using our tutorial deployment?
Use the code I pasted in a integ test and verify messages. Overwrite should be a delete+set message.
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
<Your answer>
Implementation ideas (optional)
NOt sure. Cassandra is giving hints on delete+set, we just need to handle it properly.