Uploaded image for project: 'JBoss A-MQ'
  1. JBoss A-MQ
  2. ENTMQ-2196

stuck messages in broker network when using replayDelay>0 and replayWhenNoConsumers=true

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Obsolete
    • Icon: Major Major
    • None
    • None
    • None
    • None

      In a full mesh broker cluster where consumers arbitrarily connect to a broker instance and with replayWhenNoConsumers=true, it may be necessary to configure a replayDelay on the network connector to avoid that message which get replayed quickly are treated as duplicates by the broker. See ENTMQ-444 for more details.

      This configuration (replayWhenNoConsumers=true && replayDelay > 0) however can lead to messages not getting dispatched to a networked consumer if the consumer reconnects to a different broker before replayDelay has elapsed.

      Here is an example:
      When a broker A has a consumer connected and messages on the queue, it starts to dispatch the messages to the registered consumer (based on prefetch).
      If the real consumer is on a different broker B, there is a demand subscription created for broker B in broker A. From broker A's point of view this is just a regular consumer.

      So broker A dispatches the messages to its registered consumer, which happens to be the DemandSubscription. Its the DemandSubscription that enforces the configured replayDelay.

      // org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory$ConditionalNetworkBridgeFilter.matchesForwardingFilter()
      @Override
      protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
        boolean match = true;
        if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) {
          // potential replay back to origin
          match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
          if (match) {
            LOG.trace("Replaying [{}] for [{}] back to origin in the absence of a local consumer", message.getMessageId(), message.getDestination());
          } else {
            LOG.trace("Suppressing replay of [{}] for [{}] back to origin {}", new Object[]{ message.getMessageId(), message.getDestination(), Arrays.asList(message.getBrokerPath())} );
          }
          ...
      }
      

      The line

       
        match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
      

      and the call to hasNotJustArrived() is important! This method is implemented as follows

      private boolean hasNotJustArrived(Message message) {
        return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
      }
      

      It basically means that if the dispatch of the message has been started by broker A but the replayDelay has not yet elapsed, we don't accept the message, i.e. we don't pass it on to the target broker B.
      This is generally okay but causes a problem in the this use case.

      Because of

      • the network bridge suppressing the msg due to replayDelay not having elapsed yet,
      • the real consumer on broker B has not received a single msg yet and hence has nothing to ack and won't ask for the next msgs,
      • there is no other consumer coming around around that would trigger a new dispatch of the message,
      • there is no second dispatch of the message being tried by broker A since there is no additional demand

      the message remains stuck on broker A and does not get forwarded over the network bridge.
      Those messages can still be browsed and can be consumed by another consumer or by restarting the consumer as this triggers a new dispatch (at a time when the replayDelay has elapsed so it can even be consumed by the same original consumer if it reconnects).

            Unassigned Unassigned
            rhn-support-tmielke Torsten Mielke
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: