Uploaded image for project: 'FUSE Message Broker'
  1. FUSE Message Broker
  2. MB-853

Under a heavy load some connections are rejected when the Kahadb ThreadPoolExecutor cannot accept more tasks for execution.

    Details

    • Type: Enhancement
    • Status: Resolved
    • Priority: Major
    • Resolution: Done
    • Affects Version/s: 5.4.2-fuse-02-00
    • Fix Version/s: 5.5.0-fuse-00-00
    • Component/s: broker
    • Labels:
      None

      Description

      Hi,

      When this scneario occurs, my broker throws the following error:

      2011-03-04 09:46:55,295 [127.0.0.1:30197] WARN ProtocolConverter - Exception occured processing:
      SEND
      receipt:xxx
      destination:/queue/loadq-9
      persistent:true
       
      Message from producer 9
      yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghi
      jklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrst
      uvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcd
      efghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmno
      pqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxy
      zabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghij
      klmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstu
      vwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcde
      fghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghij
       
      java.util.concurrent.RejectedExecutionException
      at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
      at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
      at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
      at org.apache.activemq.store.kahadb.KahaDBStore.addQueueTask(KahaDBStore.java:262)
      at org.apache.activemq.store.kahadb.KahaDBStore$KahaDBMessageStore.asyncAddQueueMessage(KahaDBStore.java:318)
      at org.apache.activemq.store.kahadb.KahaDBTransactionStore.asyncAddQueueMessage(KahaDBTransactionStore.java:374)
      at org.apache.activemq.store.kahadb.KahaDBTransactionStore$1.asyncAddQueueMessage(KahaDBTransactionStore.java:161)
      at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:671)
      at org.apache.activemq.broker.region.Queue.send(Queue.java:644)
      at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:365)
      at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:520)
      at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
      at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
      at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:227)
      at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
      at org.apache.activemq.broker.UserIDBroker.send(UserIDBroker.java:56)
      at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
      at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:130)
      at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:192)
      at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
      at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:461)
      at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:677)
      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:310)
      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
      at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
      at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
      at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:140)
      at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:253)
      at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:178)
      at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:70)
      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222)
      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
      at java.lang.Thread.run(Thread.java:619)

      Unfortunately, I can not reproduce this error, however, based on internal conversations we'd like to add an improvement to add a reject execution handler that will block
      if there's no space in the pool (instead of rejecting it) and thus slow down the broker in this scenario.

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                dbosanac Dejan Bosanac
                Reporter:
                sjavurek Susan Javurek
              • Votes:
                0 Vote for this issue
                Watchers:
                0 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: