Uploaded image for project: 'Red Hat Fuse'
  1. Red Hat Fuse
  2. ENTESB-17040

Kamelet elasticsearch-index-sink cant use exchangeid as indexid

    XMLWordPrintable

Details

    • Bug
    • Resolution: Done
    • Major
    • Camel-K-GA
    • Camel-K-GA
    • Camel-K
    • None
    • False
    • False
    • % %
    • +
    • Automated
    • Undefined

    Description

      NOTE: I'm using the latest version of the kamelet from github

      One of the options is not to pass the indexId/indexName to the kamelet and exchangeId would be used:

        flow:
          from:
            steps:
            - choice:
                otherwise:
                  steps:
                  - set-header:
                      name: indexId
                      simple: ${exchangeId}
                when:
                - simple: ${header[indexId]}
                  steps:
                  - set-header:
                      name: indexId
                      simple: ${header[indexId]}
                - simple: ${header[ce-indexId]}
                  steps:
                  - set-header:
                      name: indexId
                      simple: ${header[ce-indexId]}
            - choice:
                otherwise:
                  steps:
                  - set-header:
                      name: indexName
                      simple: ${exchangeId}
                when:
                - simple: ${header[indexName]}
                  steps:
                  - set-header:
                      name: indexName
                      simple: ${header[indexName]}
                - simple: ${header[ce-indexName]}
                  steps:
                  - set-header:
                      name: indexName
                      simple: ${header[ce-indexName]}
      

      This does not work, as the elasticsearch needs to have the indexId lowercase:

      2021-07-21 12:29:13,521 WARN  [org.apa.cam.com.tim.TimerConsumer] (Camel (camel-1) thread #0 - timer://x) Error processing exchange. Exchange[8494CDA82FA3229-0000000000000000]. Caused by: [org.elasticsearch.ElasticsearchStatusException - Elasticsearch exception [type=invalid_index_name_exception, reason=Invalid index name [8494CDA82FA3229-0000000000000000], must be lowercase]]: [8494CDA82FA3229-0000000000000000] ElasticsearchStatusException[Elasticsearch exception [type=invalid_index_name_exception, reason=Invalid index name [8494CDA82FA3229-0000000000000000], must be lowercase]]
          at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:187)
          at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1907)
          at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1884)
          at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1641)
          at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1598)
          at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1568)
          at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:985)
          at org.apache.camel.component.elasticsearch.ElasticsearchProducer.process(ElasticsearchProducer.java:172)
          at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
          at org.apache.camel.component.kameletreify.KameletReifyEndpoint$KameletProducer.process(KameletReifyEndpoint.java:143)
          at org.apache.camel.processor.SendProcessor.lambda$process$2(SendProcessor.java:191)
          at org.apache.camel.support.cache.DefaultProducerCache.doInAsyncProducer(DefaultProducerCache.java:318)
          at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:190)
          at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
          at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
          at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
          at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
          at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
          at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:209)
          at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:76)
          at java.base/java.util.TimerThread.mainLoop(Timer.java:556)
          at java.base/java.util.TimerThread.run(Timer.java:506)
          Suppressed: org.elasticsearch.client.ResponseException: method [PUT], host [http://tnb-elasticsearch-es-http:9200], URI [/8494CDA82FA3229-0000000000000000/_doc/8494CDA82FA3229-0000000000000000?wait_for_active_shards=1&timeout=1m], status line [HTTP/1.1 400 Bad Request]                                                                                                                                                       
      {"error":{"root_cause":[{"type":"invalid_index_name_exception","reason":"Invalid index name [8494CDA82FA3229-0000000000000000], must be lowercase","index_uuid":"_na_","index":"8494CDA82FA3229-0000000000000000"}],"type":"invalid_index_name_exception","reason":"Invalid index name [8494CDA82FA3229-0000000000000000], must be lowercase","index_uuid":"_na_","index":"8494CDA82FA3229-0000000000000000"},"status":400}
              at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:318)
              at org.elasticsearch.client.RestClient.performRequest(RestClient.java:288)
              at org.elasticsearch.client.RestClient.performRequest(RestClient.java:262)
              at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1628)
              ... 18 more
      

      example integration:

      // camel-k: language=java
      package com.test;
      
      import org.apache.camel.builder.RouteBuilder;
      
      public class MyRouteBuilder extends RouteBuilder {
          private final String body = "{\"message\": \"Hello Elasticsearch c330b\"}";
      
          @Override
          public void configure() throws Exception {
              from("timer:x?repeatCount=1").setBody(constant(body)).to("kamelet:elasticsearch-index-sink?enableSSL=false&password=<pw>&clusterName=tnb-elasticsearch&user=elastic&hostAddresses=<address>");
          }
      }
      
      

      Attachments

        Activity

          People

            acosenti Andrea Cosentino
            avano@redhat.com Andrej Vano
            Andrej Vano Andrej Vano
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: