Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1118

Data from Postgres partitioned table written to wrong topic during snapshot

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Closed (View Workflow)
    • Priority: Major
    • Resolution: Done
    • Affects Version/s: 0.9.0.CR1
    • Fix Version/s: 0.9.0.Final
    • Component/s: postgresql-connector
    • Labels:
      None
    • Environment:

      AWS RDS Postgres 10.6
      Confluent Platform 5.1.0 (Running on 2 AWS m5a.xlarge instances, with 3 kafka brokers & 2gb heaps, and a connect instance with a 2gb heap)

    • Steps to Reproduce:
      Hide

      DB Schema

      create table public.dbz_repo
      (
        user_id       bigint           not null,
        sequence_id   bigint           not null,
        longitude     double precision not null,
        latitude      double precision not null,
        created_by_id bigint           not null,
        client_ts     timestamp        not null,
        server_ts     timestamp        not null,
        type          varchar(50)      not null
      )
        partition by RANGE (user_id);
       
      create table public.dbz_repo_1_500
        partition of public.dbz_repo
          (
          constraint dbz_repo_1_500_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('1') TO ('501');
       
      create table public.dbz_repo_501_1000
        partition of public.dbz_repo
          (
          constraint dbz_repo_501_1000_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('501') TO ('1001');
       
      create table public.dbz_repo_1001_1500
        partition of public.dbz_repo
          (
          constraint dbz_repo_1001_1500_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('1001') TO ('1501');
       
       
      create table public.dbz_repo_1501_2000
        partition of public.dbz_repo
          (
          constraint dbz_repo_1501_2000_pkey
            primary key (user_id, sequence_id)
          )
          FOR VALUES FROM ('1501') TO ('2001');
      

      DB Inserts

       
      insert into dbz_repo
      (user_id, sequence_id, longitude, latitude, created_by_id, client_ts, server_ts, type)
      select random() * 1999 + 1, random() * 1540277658018 + 1, random() * 180, random() * 180, random() * 1999 + 1, now(), now(), 'abcdefg'
      from generate_series(1, 2000000);
      
      

      Debezium config

       
      {
          "name": "debezium_repo",
          "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "plugin.name": "wal2json_rds_streaming",
            "database.hostname": "<HOST>.rds.amazonaws.com",
            "database.port": "5432",
            "database.user": "debezium",
            "database.password": "<PASSWORD>",
            "database.dbname" : "nextgeo",
            "database.server.name": "debezium,
            "decimal.handling.mode": "string",
            "hstore.handling.mode": "json",
            "tombstones.on.delete": false,
            "table.whitelist": ".*dbz.*",
            "slot.name": "debezium_repo"
          }
      }
      
      

      Python comparison script

       
      import psycopg2
      from kafka import KafkaConsumer
       
      psql_host = "<HOST>.rds.amazonaws.com"
      psql_user = "debezium"
      psql_pass = "<PASSWORD>"
      psql_db = "debezium"
       
      db = psycopg2.connect(host=psql_host, database=psql_db, user=psql_user, password=psql_pass)
       
      cur = db.cursor()
       
      tables = []
       
      cur.execute("""SELECT table_name
      FROM information_schema.tables
      WHERE table_type='BASE TABLE'
      AND table_schema='public'
      AND table_name LIKE '%dbz%'
      ORDER BY table_name;""")
       
      for row in cur.fetchall():
          tables.append(row[0])
       
      print("table | rows_in_db | records_in_topic | diff")
       
      for table in tables:
       
          cur.execute("select count(*) from " + table)
          rows = cur.fetchone()[0]
       
          consumer = KafkaConsumer("debezium.public." + table,
                                   bootstrap_servers=["kafka:9092"],
                                   enable_auto_commit=False,
                                   consumer_timeout_ms=10000,
                                   auto_offset_reset="earliest",
                                   group_id=None)
       
          count = 0
          for _ in consumer:
              count += 1
       
          diff = rows - count
       
          print(table + " | " + str(rows) + " | " + str(count) + " | " + str(diff))
       
      db.close()
      
      

      Show
      DB Schema create table public .dbz_repo ( user_id bigint not null , sequence_id bigint not null , longitude double precision not null , latitude double precision not null , created_by_id bigint not null , client_ts timestamp not null , server_ts timestamp not null , type varchar (50) not null ) partition by RANGE (user_id);   create table public .dbz_repo_1_500 partition of public .dbz_repo ( constraint dbz_repo_1_500_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '1' ) TO ( '501' );   create table public .dbz_repo_501_1000 partition of public .dbz_repo ( constraint dbz_repo_501_1000_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '501' ) TO ( '1001' );   create table public .dbz_repo_1001_1500 partition of public .dbz_repo ( constraint dbz_repo_1001_1500_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '1001' ) TO ( '1501' );     create table public .dbz_repo_1501_2000 partition of public .dbz_repo ( constraint dbz_repo_1501_2000_pkey primary key (user_id, sequence_id) ) FOR VALUES FROM ( '1501' ) TO ( '2001' ); DB Inserts   insert into dbz_repo (user_id, sequence_id, longitude, latitude, created_by_id, client_ts, server_ts, type) select random() * 1999 + 1, random() * 1540277658018 + 1, random() * 180, random() * 180, random() * 1999 + 1, now(), now(), 'abcdefg' from generate_series(1, 2000000); Debezium config   { "name": "debezium_repo", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "wal2json_rds_streaming", "database.hostname": "<HOST>.rds.amazonaws.com", "database.port": "5432", "database.user": "debezium", "database.password": "<PASSWORD>", "database.dbname" : "nextgeo", "database.server.name": "debezium, "decimal.handling.mode": "string", "hstore.handling.mode": "json", "tombstones.on.delete": false, "table.whitelist": ".*dbz.*", "slot.name": "debezium_repo" } } Python comparison script   import psycopg2 from kafka import KafkaConsumer   psql_host = "<HOST>.rds.amazonaws.com" psql_user = "debezium" psql_pass = "<PASSWORD>" psql_db = "debezium"   db = psycopg2.connect(host = psql_host, database = psql_db, user = psql_user, password = psql_pass)   cur = db.cursor()   tables = []   cur.execute( """SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema='public' AND table_name LIKE '%dbz%' ORDER BY table_name;""" )   for row in cur.fetchall(): tables.append(row[ 0 ])   print ( "table | rows_in_db | records_in_topic | diff" )   for table in tables:   cur.execute( "select count(*) from " + table) rows = cur.fetchone()[ 0 ]   consumer = KafkaConsumer( "debezium.public." + table, bootstrap_servers = [ "kafka:9092" ], enable_auto_commit = False , consumer_timeout_ms = 10000 , auto_offset_reset = "earliest" , group_id = None )   count = 0 for _ in consumer: count + = 1   diff = rows - count   print (table + " | " + str (rows) + " | " + str (count) + " | " + str (diff))   db.close()

      Description

      Overview

      During a snapshot phase, when reading from a partitioned postgres table, rows from the parent partition table are written to a topic for a derived table.

      As described further in the reproduction notes below, let's create a parent partition named `dbz_repo` and two partition tables named `dbz_repo_1_500` and `dbz_repo_501_1000`. Next we insert random records into the partitioned tables and kick off a debezium snapshot phase.

      After the snapshot completes, the `dbz_repo` topic contains 0 records, while the `dbz_repo_1_500` contains N records from the `dbz_repo_1_500` table PLUS M records from the `dbz_repo` table (i.e. an additionally copy of `dbz_repo_1_500` and everything from `dbz_repo_501_1000`). The `dbz_repo_501_1000` is the only correct topic, containing only a single copy of all rows of the source table.

      Logs

      During snapshotting there are errors related to flush failures, but otherwise look clean:

      [2019-01-31 18:40:59,085] ERROR WorkerSourceTask

      {id=debezium_repo-0} Failed to flush, timed out while waiting for producer to flush outstanding 29175 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
      [2019-01-31 18:40:59,091] ERROR WorkerSourceTask{id=debezium_repo-0}

      Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)

      Comparison output sample

      Attached below is a small python script to compare the number of records in the database tables to the number of records in each topic. Here's sample output for a run:

      table rows_in_db records_in_topic diff
      dbz_repo 2000000 0 2000000
      dbz_repo_1001_1500 500077 500077 0
      dbz_repo_1_500 499596 2499596 -2000000
      dbz_repo_1501_2000 499529 499529 0
      dbz_repo_501_1000 500798 500798 0

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                kppullin Kevin Pullin
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: