-
Bug
-
Resolution: Obsolete
-
Major
-
None
-
None
-
None
-
None
In a full mesh broker cluster where consumers arbitrarily connect to a broker instance and with replayWhenNoConsumers=true, it may be necessary to configure a replayDelay on the network connector to avoid that message which get replayed quickly are treated as duplicates by the broker. See ENTMQ-444 for more details.
This configuration (replayWhenNoConsumers=true && replayDelay > 0) however can lead to messages not getting dispatched to a networked consumer if the consumer reconnects to a different broker before replayDelay has elapsed.
Here is an example:
When a broker A has a consumer connected and messages on the queue, it starts to dispatch the messages to the registered consumer (based on prefetch).
If the real consumer is on a different broker B, there is a demand subscription created for broker B in broker A. From broker A's point of view this is just a regular consumer.
So broker A dispatches the messages to its registered consumer, which happens to be the DemandSubscription. Its the DemandSubscription that enforces the configured replayDelay.
// org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory$ConditionalNetworkBridgeFilter.matchesForwardingFilter() @Override protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) { boolean match = true; if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) { // potential replay back to origin match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message); if (match) { LOG.trace("Replaying [{}] for [{}] back to origin in the absence of a local consumer", message.getMessageId(), message.getDestination()); } else { LOG.trace("Suppressing replay of [{}] for [{}] back to origin {}", new Object[]{ message.getMessageId(), message.getDestination(), Arrays.asList(message.getBrokerPath())} ); } ... }
The line
match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
and the call to hasNotJustArrived() is important! This method is implemented as follows
private boolean hasNotJustArrived(Message message) { return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis()); }
It basically means that if the dispatch of the message has been started by broker A but the replayDelay has not yet elapsed, we don't accept the message, i.e. we don't pass it on to the target broker B.
This is generally okay but causes a problem in the this use case.
Because of
- the network bridge suppressing the msg due to replayDelay not having elapsed yet,
- the real consumer on broker B has not received a single msg yet and hence has nothing to ack and won't ask for the next msgs,
- there is no other consumer coming around around that would trigger a new dispatch of the message,
- there is no second dispatch of the message being tried by broker A since there is no additional demand
the message remains stuck on broker A and does not get forwarded over the network bridge.
Those messages can still be browsed and can be consumed by another consumer or by restarting the consumer as this triggers a new dispatch (at a time when the replayDelay has elapsed so it can even be consumed by the same original consumer if it reconnects).