Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7573

Incorrect config is used for creating internal schema history connector with AWS MSK with SASL/SCARM setup

    XMLWordPrintable

Details

    • False
    • None
    • False

    Description

      Bug report

      For bug reports, provide this information, please:

       

      We have an instance of Kafka connect running on a self-hosted node which connects to AWS MSK using SASL_SSL mechanism. I am running into an issue where the SQL server connector is running out of memory with the underlying error as what seems to be an Auth issue for the internal schemahistory connector because in another environment where Kafka is deployed with no auth this works fine.

      We have added the below settings to connect-distributed.properties but still I don’t see these actually being picked and config values in the log still shows as PLAINTEXT. The only way for the connector to work is to pass these values in the connector create API post which the connector works fine and prints out correct values in the consumer and producer config and is able to establish the schema history connector.
      The auth seems to be successful for the topic for which the request comes (verified in the config logs that sasl related properties are set properly) but fails for the internal schema connector.

       

      What Debezium connector do you use and what version?

      SQL server connector 2.3.0.final

      What is the connector configuration?

       

      # existing config
      security.protocol=SASL_SSL
      sasl.mechanism=SCRAM-SHA-512
      sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';
      producer.security.protocol=SASL_SSL
      producer.sasl.mechanism=SCRAM-SHA-512
      producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';
      ssl.truststore.type=PEM
      producer.ssl.truststore.type=PEM
      ssl.truststore.location=/etc/pki/tls/certs/ca-bundle.crt
      producer.ssl.truststore.location=/etc/pki/tls/certs/ca-bundle.crt
      # Newly added config
      schema.history.internal.producer.security.protocol=SASL_SSL
      schema.history.internal.producer.sasl.mechanism=SCRAM-SHA-512
      schema.history.internal.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';
      schema.history.internal.consumer.security.protocol=SASL_SSL
      schema.history.internal.consumer.sasl.mechanism=SCRAM-SHA-512
      schema.history.internal.consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='user' password='pass';
      

      What is the captured database version and mode of depoyment?

      DB - SQL server 2019

      Kafka - AWS MSK with SASL/SCRAM auth mechanism

      What behaviour do you expect?

      Configuration defined in `connect-distributed.properties` should be used when auth related settings are defined even for internal schema history connectors.

      What behaviour do you see?

      Configuration is ignored and is defaulted to PLAINTEXT.

      Do you see the same behaviour using the latest relesead Debezium version?

      NA

      Do you have the connector logs, ideally from start till finish?

       

      INFO KafkaSchemaHistory Consumer config: {key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, enable.auto.commit=false, group.id=xxxxx-config-schemahistory, bootstrap.servers=aaaa.kafka.us-east-1.amazonaws.com:9096,bbbb.kafka.us-east-1.amazonaws.com:9096,cccc.kafka.us-east-1.amazonaws.com:9096, fetch.min.bytes=1, session.timeout.ms=10000, auto.offset.reset=earliest, client.id=xxxxx-config-schemahistory} (io.debezium.storage.kafka.history.KafkaSchemaHistory:245)
      INFO KafkaSchemaHistory Producer config: {retries=1, value.serializer=org.apache.kafka.common.serialization.StringSerializer, acks=1, batch.size=32768, max.block.ms=10000, bootstrap.servers=aaaa.kafka.us-east-1.amazonaws.com:9096,bbbb.kafka.us-east-1.amazonaws.com:9096,cccc.kafka.us-east-1.amazonaws.com:9096, buffer.memory=1048576, key.serializer=org.apache.kafka.common.serialization.StringSerializer, client.id=xxxxx-config-schemahistory, linger.ms=0} (io.debezium.storage.kafka.history.KafkaSchemaHistory:246)
      ....
      INFO ProducerConfig values: 
          acks = 1
          auto.include.jmx.reporter = true
          batch.size = 32768
          bootstrap.servers = [aaaa.kafka.us-east-1.amazonaws.com:9096, bbbb.kafka.us-east-1.amazonaws.com:9096, cccc.kafka.us-east-1.amazonaws.com:9096]
          buffer.memory = 1048576
          client.dns.lookup = use_all_dns_ips
          client.id = xxxxx-config-schemahistory
          ***removed***
          sasl.jaas.config = null
          ***removed***
          sasl.mechanism = GSSAPI
          ***removed***
          security.protocol = PLAINTEXT
          security.providers = null
              ***removed***
       (org.apache.kafka.clients.producer.ProducerConfig:370)
      ...
      INFO ConsumerConfig values: 
          ***removed***
          bootstrap.servers = [aaaa.kafka.us-east-1.amazonaws.com:9096, bbbb.kafka.us-east-1.amazonaws.com:9096, cccc.kafka.us-east-1.amazonaws.com:9096]
          check.crcs = true
          client.dns.lookup = use_all_dns_ips
          client.id = xxxxx-config-schemahistory
          ***removed***
          fetch.min.bytes = 1
          group.id = xxxxx-config-schemahistory
          ***removed***
          sasl.jaas.config = null
              ***removed***
          sasl.mechanism = GSSAPI
              ***removed***
          security.protocol = PLAINTEXT
          security.providers = null
              ***removed***
       (org.apache.kafka.clients.consumer.ConsumerConfig:370)
      ....
      INFO [Producer clientId=xxxxx-config-schemahistory] Node -2 disconnected. (org.apache.kafka.clients.NetworkClient:977)
      INFO [Producer clientId=xxxxx-config-schemahistory] Cancelled in-flight API_VERSIONS request with correlation id 0 due to node -2 being disconnected (elapsed time since creation: 296ms, elapsed time since send: 296ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:344)
      WARN [Producer clientId=xxxxx-config-schemahistory] Bootstrap broker bbbb.kafka.us-east-1.amazonaws.com:9096 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1105)
      INFO App info kafka.consumer for xxxxx-config-schemahistory unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
      ERROR WorkerSourceTask{id=xxxxx-config-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
      

      How to reproduce the issue using our tutorial deployment?

      Deploy the connector with AWS MSK with SASL/SCRAM authentication mechanism

       

      TLDR

      1. Schema history connector does not work with SASL Kafka managed instance.
      2. On passing the config in API everything works.
      3. Does not work when settings the same values in connect-distributed.properties
      4. Is there no config file where these settings can be added, the apprehension towards adding in the create call is to avoid persisting the Kafka credential in the client.
      5. The connection to the topic is done fine only issue is with internal connector which does not pick the right auth related config.

      Attachments

        Activity

          People

            Unassigned Unassigned
            aadil.siddiqui Aadil Siddiqui
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: