connect_1 | 2021-10-18 13:42:46,155 WARN MySQL|dbserver|snapshot Ignoring unparseable DDL statement 'CREATE TABLE `cached_sales` ( connect_1 | `id` bigint unsigned NOT NULL AUTO_INCREMENT, connect_1 | `sale_id` bigint unsigned NOT NULL, connect_1 | `sale_item_id` bigint unsigned NOT NULL, connect_1 | `retailer_id` bigint unsigned DEFAULT NULL, connect_1 | `retailer_branch_id` bigint unsigned DEFAULT NULL, connect_1 | `retailer_branch_location_id` bigint unsigned NOT NULL, connect_1 | `sales_area_id` bigint unsigned DEFAULT NULL, connect_1 | `product_id` bigint unsigned DEFAULT NULL, connect_1 | `product_variation_id` bigint unsigned NOT NULL, connect_1 | `season_ids` json DEFAULT NULL, connect_1 | `category_ids` json DEFAULT NULL, connect_1 | `color_ids` json DEFAULT NULL, connect_1 | `size_ids` json DEFAULT NULL, connect_1 | `gender_ids` json DEFAULT NULL, connect_1 | `city` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | `country_code` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | `location_type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | `edi_enabled` tinyint(1) DEFAULT NULL, connect_1 | `quantity` int DEFAULT NULL, connect_1 | `brand_normalized_purchase_price_net` int DEFAULT NULL, connect_1 | `brand_normalized_selling_price_gross` int DEFAULT NULL, connect_1 | `item_updated_at` datetime DEFAULT NULL, connect_1 | `sold_at` datetime DEFAULT NULL, connect_1 | `created_at` timestamp NULL DEFAULT NULL, connect_1 | `updated_at` timestamp NULL DEFAULT NULL, connect_1 | `tenant_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | PRIMARY KEY (`id`), connect_1 | UNIQUE KEY `cached_sales_sale_item_id_unique` (`sale_item_id`), connect_1 | KEY `cached_sales_sale_id_foreign` (`sale_id`), connect_1 | KEY `cached_sales_retailer_id_foreign` (`retailer_id`), connect_1 | KEY `cached_sales_retailer_branch_id_foreign` (`retailer_branch_id`), connect_1 | KEY `cached_sales_retailer_branch_location_id_foreign` (`retailer_branch_location_id`), connect_1 | KEY `cached_sales_sales_area_id_foreign` (`sales_area_id`), connect_1 | KEY `cached_sales_product_id_foreign` (`product_id`), connect_1 | KEY `cached_sales_product_variation_id_foreign` (`product_variation_id`), connect_1 | KEY `cached_sales_city_index` (`city`), connect_1 | KEY `cached_sales_country_code_index` (`country_code`), connect_1 | KEY `cached_sales_location_type_index` (`location_type`), connect_1 | KEY `cached_sales_sold_at_index` (`sold_at`), connect_1 | KEY `cached_sales_season_ids_index` ((cast(json_extract(`season_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_category_ids_index` ((cast(json_extract(`category_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_color_ids_index` ((cast(json_extract(`color_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_size_ids_index` ((cast(json_extract(`size_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_gender_ids_index` ((cast(json_extract(`gender_ids`,_utf8mb4'$') as unsigned array))), connect_1 | CONSTRAINT `cached_sales_product_id_foreign` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`), connect_1 | CONSTRAINT `cached_sales_product_variation_id_foreign` FOREIGN KEY (`product_variation_id`) REFERENCES `product_variations` (`id`), connect_1 | CONSTRAINT `cached_sales_retailer_branch_id_foreign` FOREIGN KEY (`retailer_branch_id`) REFERENCES `retailer_branches` (`id`), connect_1 | CONSTRAINT `cached_sales_retailer_branch_location_id_foreign` FOREIGN KEY (`retailer_branch_location_id`) REFERENCES `retailer_branch_locations` (`id`), connect_1 | CONSTRAINT `cached_sales_retailer_id_foreign` FOREIGN KEY (`retailer_id`) REFERENCES `retailers` (`id`), connect_1 | CONSTRAINT `cached_sales_sale_id_foreign` FOREIGN KEY (`sale_id`) REFERENCES `sales` (`id`), connect_1 | CONSTRAINT `cached_sales_sale_item_id_foreign` FOREIGN KEY (`sale_item_id`) REFERENCES `sale_items` (`id`) ON DELETE CASCADE, connect_1 | CONSTRAINT `cached_sales_sales_area_id_foreign` FOREIGN KEY (`sales_area_id`) REFERENCES `sales_areas` (`id`) connect_1 | ) ENGINE=InnoDB AUTO_INCREMENT=13594436 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci': {} [io.debezium.connector.mysql.MySqlDatabaseSchema] connect_1 | io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'CREATE TABLE `cached_sales` ( connect_1 | `id` bigint unsigned NOT NULL AUTO_INCREMENT, connect_1 | `sale_id` bigint unsigned NOT NULL, connect_1 | `sale_item_id` bigint unsigned NOT NULL, connect_1 | `retailer_id` bigint unsigned DEFAULT NULL, connect_1 | `retailer_branch_id` bigint unsigned DEFAULT NULL, connect_1 | `retailer_branch_location_id` bigint unsigned NOT NULL, connect_1 | `sales_area_id` bigint unsigned DEFAULT NULL, connect_1 | `product_id` bigint unsigned DEFAULT NULL, connect_1 | `product_variation_id` bigint unsigned NOT NULL, connect_1 | `season_ids` json DEFAULT NULL, connect_1 | `category_ids` json DEFAULT NULL, connect_1 | `color_ids` json DEFAULT NULL, connect_1 | `size_ids` json DEFAULT NULL, connect_1 | `gender_ids` json DEFAULT NULL, connect_1 | `city` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | `country_code` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | `location_type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | `edi_enabled` tinyint(1) DEFAULT NULL, connect_1 | `quantity` int DEFAULT NULL, connect_1 | `brand_normalized_purchase_price_net` int DEFAULT NULL, connect_1 | `brand_normalized_selling_price_gross` int DEFAULT NULL, connect_1 | `item_updated_at` datetime DEFAULT NULL, connect_1 | `sold_at` datetime DEFAULT NULL, connect_1 | `created_at` timestamp NULL DEFAULT NULL, connect_1 | `updated_at` timestamp NULL DEFAULT NULL, connect_1 | `tenant_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, connect_1 | PRIMARY KEY (`id`), connect_1 | UNIQUE KEY `cached_sales_sale_item_id_unique` (`sale_item_id`), connect_1 | KEY `cached_sales_sale_id_foreign` (`sale_id`), connect_1 | KEY `cached_sales_retailer_id_foreign` (`retailer_id`), connect_1 | KEY `cached_sales_retailer_branch_id_foreign` (`retailer_branch_id`), connect_1 | KEY `cached_sales_retailer_branch_location_id_foreign` (`retailer_branch_location_id`), connect_1 | KEY `cached_sales_sales_area_id_foreign` (`sales_area_id`), connect_1 | KEY `cached_sales_product_id_foreign` (`product_id`), connect_1 | KEY `cached_sales_product_variation_id_foreign` (`product_variation_id`), connect_1 | KEY `cached_sales_city_index` (`city`), connect_1 | KEY `cached_sales_country_code_index` (`country_code`), connect_1 | KEY `cached_sales_location_type_index` (`location_type`), connect_1 | KEY `cached_sales_sold_at_index` (`sold_at`), connect_1 | KEY `cached_sales_season_ids_index` ((cast(json_extract(`season_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_category_ids_index` ((cast(json_extract(`category_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_color_ids_index` ((cast(json_extract(`color_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_size_ids_index` ((cast(json_extract(`size_ids`,_utf8mb4'$') as unsigned array))), connect_1 | KEY `cached_sales_gender_ids_index` ((cast(json_extract(`gender_ids`,_utf8mb4'$') as unsigned array))), connect_1 | CONSTRAINT `cached_sales_product_id_foreign` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`), connect_1 | CONSTRAINT `cached_sales_product_variation_id_foreign` FOREIGN KEY (`product_variation_id`) REFERENCES `product_variations` (`id`), connect_1 | CONSTRAINT `cached_sales_retailer_branch_id_foreign` FOREIGN KEY (`retailer_branch_id`) REFERENCES `retailer_branches` (`id`), connect_1 | CONSTRAINT `cached_sales_retailer_branch_location_id_foreign` FOREIGN KEY (`retailer_branch_location_id`) REFERENCES `retailer_branch_locations` (`id`), connect_1 | CONSTRAINT `cached_sales_retailer_id_foreign` FOREIGN KEY (`retailer_id`) REFERENCES `retailers` (`id`), connect_1 | CONSTRAINT `cached_sales_sale_id_foreign` FOREIGN KEY (`sale_id`) REFERENCES `sales` (`id`), connect_1 | CONSTRAINT `cached_sales_sale_item_id_foreign` FOREIGN KEY (`sale_item_id`) REFERENCES `sale_items` (`id`) ON DELETE CASCADE, connect_1 | CONSTRAINT `cached_sales_sales_area_id_foreign` FOREIGN KEY (`sales_area_id`) REFERENCES `sales_areas` (`id`) connect_1 | ) ENGINE=InnoDB AUTO_INCREMENT=13594436 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci' connect_1 | no viable alternative at input 'CREATE TABLE `cached_sales` (\n `id` bigint unsigned NOT NULL AUTO_INCREMENT,\n `sale_id` bigint unsigned NOT NULL,\n `sale_item_id` bigint unsigned NOT NULL,\n `retailer_id` bigint unsigned DEFAULT NULL,\n `retailer_branch_id` bigint unsigned DEFAULT NULL,\n `retailer_branch_location_id` bigint unsigned NOT NULL,\n `sales_area_id` bigint unsigned DEFAULT NULL,\n `product_id` bigint unsigned DEFAULT NULL,\n `product_variation_id` bigint unsigned NOT NULL,\n `season_ids` json DEFAULT NULL,\n `category_ids` json DEFAULT NULL,\n `color_ids` json DEFAULT NULL,\n `size_ids` json DEFAULT NULL,\n `gender_ids` json DEFAULT NULL,\n `city` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,\n `country_code` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,\n `location_type` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,\n `edi_enabled` tinyint(1) DEFAULT NULL,\n `quantity` int DEFAULT NULL,\n `brand_normalized_purchase_price_net` int DEFAULT NULL,\n `brand_normalized_selling_price_gross` int DEFAULT NULL,\n `item_updated_at` datetime DEFAULT NULL,\n `sold_at` datetime DEFAULT NULL,\n `created_at` timestamp NULL DEFAULT NULL,\n `updated_at` timestamp NULL DEFAULT NULL,\n `tenant_id` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL,\n PRIMARY KEY (`id`),\n UNIQUE KEY `cached_sales_sale_item_id_unique` (`sale_item_id`),\n KEY `cached_sales_sale_id_foreign` (`sale_id`),\n KEY `cached_sales_retailer_id_foreign` (`retailer_id`),\n KEY `cached_sales_retailer_branch_id_foreign` (`retailer_branch_id`),\n KEY `cached_sales_retailer_branch_location_id_foreign` (`retailer_branch_location_id`),\n KEY `cached_sales_sales_area_id_foreign` (`sales_area_id`),\n KEY `cached_sales_product_id_foreign` (`product_id`),\n KEY `cached_sales_product_variation_id_foreign` (`product_variation_id`),\n KEY `cached_sales_city_index` (`city`),\n KEY `cached_sales_country_code_index` (`country_code`),\n KEY `cached_sales_location_type_index` (`location_type`),\n KEY `cached_sales_sold_at_index` (`sold_at`),\n KEY `cached_sales_season_ids_index` ((' connect_1 | at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) connect_1 | at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) connect_1 | at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544) connect_1 | at org.antlr.v4.runtime.DefaultErrorStrategy.reportNoViableAlternative(DefaultErrorStrategy.java:310) connect_1 | at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:136) connect_1 | at io.debezium.ddl.parser.mysql.generated.MySqlParser.sqlStatements(MySqlParser.java:1194) connect_1 | at io.debezium.ddl.parser.mysql.generated.MySqlParser.root(MySqlParser.java:922) connect_1 | at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.parseTree(MySqlAntlrDdlParser.java:72) connect_1 | at io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser.parseTree(MySqlAntlrDdlParser.java:45) connect_1 | at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:82) connect_1 | at io.debezium.connector.mysql.MySqlDatabaseSchema.parseDdl(MySqlDatabaseSchema.java:216) connect_1 | at io.debezium.connector.mysql.MySqlDatabaseSchema.parseSnapshotDdl(MySqlDatabaseSchema.java:196) connect_1 | at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.addSchemaEvent(MySqlSnapshotChangeEventSource.java:303) connect_1 | at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.lambda$createSchemaEventsForTables$5(MySqlSnapshotChangeEventSource.java:373) connect_1 | at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:558) connect_1 | at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:499) connect_1 | at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.createSchemaEventsForTables(MySqlSnapshotChangeEventSource.java:371) connect_1 | at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.readTableStructure(MySqlSnapshotChangeEventSource.java:360) connect_1 | at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.readTableStructure(MySqlSnapshotChangeEventSource.java:47) connect_1 | at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:119) connect_1 | at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70) connect_1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118) connect_1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) connect_1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) connect_1 | at java.base/java.lang.Thread.run(Thread.java:829) connect_1 | Caused by: org.antlr.v4.runtime.NoViableAltException connect_1 | at org.antlr.v4.runtime.atn.ParserATNSimulator.noViableAlt(ParserATNSimulator.java:2026) connect_1 | at org.antlr.v4.runtime.atn.ParserATNSimulator.execATN(ParserATNSimulator.java:467) connect_1 | at org.antlr.v4.runtime.atn.ParserATNSimulator.adaptivePredict(ParserATNSimulator.java:393) connect_1 | at io.debezium.ddl.parser.mysql.generated.MySqlParser.sqlStatements(MySqlParser.java:996) connect_1 | ... 21 more connect_1 | 2021-10-18 13:42:47,713 INFO MySQL|dbserver|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource] connect_1 | 2021-10-18 13:42:48,448 INFO || 12 records sent during previous 00:00:06.635, last recorded offset: {ts_sec=1634564565, file=mysql-bin-changelog.183102, pos=533, snapshot=true} [io.debezium.connector.common.BaseSourceTask] connect_1 | 2021-10-18 13:42:48,875 INFO || The task will send records to topic 'dbserver' for the first time. Checking whether topic exists [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2021-10-18 13:42:49,158 INFO || Creating topic 'dbserver' [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2021-10-18 13:42:49,476 INFO || Created topic (name=dbserver, numPartitions=1, replicationFactor=-1, replicasAssignments=null, configs={}) on brokers at pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092 [org.apache.kafka.connect.util.TopicAdmin] connect_1 | 2021-10-18 13:42:49,477 INFO || Created topic '(name=dbserver, numPartitions=1, replicationFactor=-1, replicasAssignments=null, configs={})' using creation group TopicCreationGroup{name='default', inclusionPattern=.*, exclusionPattern=, numPartitions=1, replicationFactor=-1, otherConfigs={}} [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2021-10-18 13:42:51,959 INFO MySQL|dbserver|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource] connect_1 | 2021-10-18 13:42:51,961 INFO MySQL|dbserver|snapshot Snapshotting contents of 1 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource] connect_1 | 2021-10-18 13:42:51,981 INFO MySQL|dbserver|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource] connect_1 | 2021-10-18 13:42:51,982 ERROR MySQL|dbserver|snapshot Producer failure [io.debezium.pipeline.ErrorHandler] connect_1 | io.debezium.DebeziumException: java.lang.NullPointerException connect_1 | at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79) connect_1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118) connect_1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) connect_1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) connect_1 | at java.base/java.lang.Thread.run(Thread.java:829) connect_1 | Caused by: java.lang.NullPointerException connect_1 | at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:340) connect_1 | at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:315) connect_1 | at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:135) connect_1 | at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70) connect_1 | ... 6 more connect_1 | 2021-10-18 13:42:52,211 INFO || WorkerSourceTask{id=inventory-connector-0} flushing 13 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2021-10-18 13:42:52,828 ERROR || WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] connect_1 | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. connect_1 | at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) connect_1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:135) connect_1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) connect_1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) connect_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) connect_1 | at java.base/java.lang.Thread.run(Thread.java:829) connect_1 | Caused by: io.debezium.DebeziumException: java.lang.NullPointerException connect_1 | at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79) connect_1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118) connect_1 | ... 5 more connect_1 | Caused by: java.lang.NullPointerException connect_1 | at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:340) connect_1 | at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:315) connect_1 | at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:135) connect_1 | at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70) connect_1 | ... 6 more connect_1 | 2021-10-18 13:42:52,830 INFO || Stopping down connector [io.debezium.connector.common.BaseSourceTask] connect_1 | 2021-10-18 13:42:52,854 INFO || Connection gracefully closed [io.debezium.jdbc.JdbcConnection] connect_1 | 2021-10-18 13:42:52,860 INFO || [Producer clientId=dbserver-dbhistory] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer] connect_1 | 2021-10-18 13:42:52,869 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,869 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,869 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,871 INFO || App info kafka.producer for dbserver-dbhistory unregistered [org.apache.kafka.common.utils.AppInfoParser] connect_1 | 2021-10-18 13:42:52,871 INFO || [Producer clientId=connector-producer-inventory-connector-0] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer] connect_1 | 2021-10-18 13:42:52,879 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,881 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,883 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,884 INFO || App info kafka.producer for connector-producer-inventory-connector-0 unregistered [org.apache.kafka.common.utils.AppInfoParser] connect_1 | 2021-10-18 13:42:52,886 INFO || App info kafka.admin.client for connector-adminclient-inventory-connector-0 unregistered [org.apache.kafka.common.utils.AppInfoParser] connect_1 | 2021-10-18 13:42:52,896 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,896 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:42:52,896 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics] connect_1 | 2021-10-18 13:43:41,974 INFO || WorkerSourceTask{id=inventory-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2021-10-18 13:44:41,908 INFO || WorkerSourceTask{id=inventory-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2021-10-18 13:45:41,841 INFO || WorkerSourceTask{id=inventory-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] connect_1 | 2021-10-18 13:46:41,772 INFO || WorkerSourceTask{id=inventory-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]