Uploaded image for project: 'AMQ Broker'
  1. AMQ Broker
  2. ENTMQBR-1476

Messages are still getting delivered to address-full-policy BLOCKed address

    XMLWordPrintable

Details

    Description

               <address-setting match="#">
                  <max-size-bytes-reject-threshold>1048576</max-size-bytes-reject-threshold>
                  <max-size-bytes>1048576</max-size-bytes>
                  <address-full-policy>BLOCK</address-full-policy>
                 [...]
      

      Even after broker blocks address due to BLOCK address full policy, messages are getting through. In case of openwire, first message of every new connection gets onto the queue. In case of AMQP, all messages from the very first producer (while there was still credit?) and in case of core, it is the messages in AMQP plus one message from the next producer.

      @RunWith(Parameterized.class)
      public class MultiprotocolProducerFlowControlTest extends ActiveMQTestBase {
          private ActiveMQServer server;
      
          enum Protocol {CORE, OPENWIRE, AMQP}
      
          @Parameterized.Parameters(name = "{0}")
          public static Collection<Object[]> data() {
              return Arrays.asList(new Object[][]{
                      {Protocol.CORE},
                      {Protocol.OPENWIRE},
                      {Protocol.AMQP},
              });
          }
      
          @Parameterized.Parameter
          public Protocol protocol;
      
          @Override
          @Before
          public void setUp() throws Exception {
              super.setUp();
          }
      
          ConnectionFactory createConnectionFactory() {
              switch (protocol) {
                  case AMQP:
                      return new JmsConnectionFactory("amqp://localhost:61618");
                  case CORE:
                      return new ActiveMQJMSConnectionFactory("tcp://localhost:61618");
                  case OPENWIRE:
                      return new ActiveMQConnectionFactory("tcp://localhost:61618");
              }
              throw new RuntimeException("Unexpected value of protocol");
          }
      
          @Test
          public void testBlockedProducerIsUnableToGetMessagesOntoAQueue() throws Exception {
              SimpleString address = SimpleString.toSimpleString("testaddress");
      
              server = createServer(true, true);
      
              server.getConfiguration().addAcceptorConfiguration("artemis",
                      "tcp://0.0.0.0:61618?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300");
      
              AddressSettings addressSettings = new AddressSettings()
                      .setMaxSizeBytes(1024)
                      .setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
              HierarchicalRepository<AddressSettings> addressSettingsHierarchicalRepository = server.getAddressSettingsRepository();
              addressSettingsHierarchicalRepository.addMatch(address.toString(), addressSettings);
      
              server.start();
              waitForServerToStart(server);
              server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
      
              ConnectionFactory connectionFactory = createConnectionFactory();
      
              for (int round = 0; round < 3; round++) {
                  Connection connection = connectionFactory.createConnection();
                  connection.start();
                  Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
                  MessageProducer producer = jmsSession.createProducer(jmsSession.createQueue(address.toString()));
                  byte[] bytes = new byte[2000];
                  BytesMessage message = jmsSession.createBytesMessage();
                  message.writeBytes(bytes);
      
                  Thread t = new Thread(() -> {
                      try {
                          System.out.println("in closing thread");
                          Thread.sleep(5000);
                          System.out.println("going to close things");
      //                    producer.close();  // amqp client will get stuck in second round (round 1) of sending if I uncomment these two lines
      //                    jmsSession.close();  // that may be ok, jms session and objects in session are not thread safe
                          connection.close();
                          System.out.println("closed session");
                      } catch (Exception ignored) {
                      }
                  });
      
                  t.start();
      
                  try {
                      // This will block
                      for (int i = 0; i < 7; i++) {
                          System.out.println("round " + round + " sending message " + i);
                          producer.send(message);
                      }
                  } catch (JMSException expected) {
                  }
                  t.join();
              }
      
              connectionFactory = createConnectionFactory();
              try (Connection connection2 = connectionFactory.createConnection();
                   Session jmsSession = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
                   MessageConsumer consumer = jmsSession.createConsumer(jmsSession.createQueue(address.toString()))) {
                  connection2.start();
                  int received = 0;
                  while (consumer.receive(1000) != null) {
                      received++;
                  }
                  Assert.assertTrue("received is " + received, received < 3);
              }
          }
      }
      

      Core

      round 0 sending message 0
      in closing thread
      round 0 sending message 1
      [Thread-13 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@2f490758)] 21:30:04,649 WARN  [org.apache.activemq.artemis.core.server] AMQ222183: Blocking message production on address 'testaddress'; size is currently: 2,790 bytes; max-size-bytes on address: 1,024, global-max-size is 2,790
      round 0 sending message 2
      round 0 sending message 3
      round 0 sending message 4
      round 0 sending message 5
      round 0 sending message 6
      going to close things
      closed session
      round 1 sending message 0
      in closing thread
      going to close things
      round 1 sending message 1
      closed session
      round 2 sending message 0
      in closing thread
      going to close things
      closed session
      [org.apache.commons.beanutils.ConvertUtils] : Convert string 'localhost' to class 'java.lang.String'
      [org.apache.commons.beanutils.converters.StringConverter] : Converting 'String' value 'localhost' to type 'String'
      [org.apache.commons.beanutils.converters.IntegerConverter] : Converting 'Integer' value '61618' to type 'Integer'
      [org.apache.commons.beanutils.converters.IntegerConverter] :     No conversion required, value is already a Integer
      [org.apache.commons.beanutils.BeanUtils] : BeanUtils.populate(ConnectionOptions{ha=false}, {})
      [org.apache.commons.beanutils.BeanUtils] : BeanUtils.populate(ServerLocatorImpl [initialConnectors=[TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61618&host=localhost], discoveryGroupConfiguration=null], {})
      [org.apache.commons.beanutils.BeanUtils] : BeanUtils.populate(ActiveMQConnectionFactory [serverLocator=ServerLocatorImpl [initialConnectors=[TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61618&host=localhost], discoveryGroupConfiguration=null], clientID=null, consumerWindowSize = 1048576, dupsOKBatchSize=1048576, transactionBatchSize=1048576, readOnly=false], {})
      [Thread-25 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@2f490758)] 21:30:19,795 INFO  [org.apache.activemq.artemis.core.server] AMQ221046: Unblocking message production on address 'testaddress'; size is currently: 0 bytes; max-size-bytes: 1,024
      [main] 21:30:20,831 INFO  [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.6.0-SNAPSHOT [0d2d0e0b-48bf-11e8-a1fb-daf62bfd3641] stopped, uptime 17.711 seconds
      [main] 21:30:21,023 INFO  [org.apache.activemq.artemis.core.server] **** end #test testBlockedProducerIsUnableToGetMessagesOntoAQueue[CORE]() ***
      
      java.lang.AssertionError: received is 8
      

      AMQP

      round 0 sending message 0
      in closing thread
      [Thread-1 (activemq-netty-threads)] 07:17:37,824 WARN  [org.apache.activemq.artemis.core.server] AMQ222183: Blocking message production on address 'testaddress'; size is currently: 2,592 bytes; max-size-bytes on address: 1,024, global-max-size is 2,592
      round 0 sending message 1
      round 0 sending message 2
      round 0 sending message 3
      round 0 sending message 4
      round 0 sending message 5
      round 0 sending message 6
      going to close things
      closed session
      round 1 sending message 0
      in closing thread
      going to close things
      closed session
      round 2 sending message 0
      in closing thread
      going to close things
      closed session
      [Thread-3 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@41005828)] 07:17:52,921 INFO  [org.apache.activemq.artemis.core.server] AMQ221046: Unblocking message production on address 'testaddress'; size is currently: 0 bytes; max-size-bytes: 1,024
      [main] 07:17:53,959 INFO  [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.6.0-SNAPSHOT [222f8003-4911-11e8-88d1-daf62bfd3641] stopped, uptime 16.818 seconds
      [main] 07:17:53,960 INFO  [org.apache.activemq.artemis.core.server] **** end #test testBlockedProducerIsUnableToGetMessagesOntoAQueue[AMQP]() ***
      
      java.lang.AssertionError: received is 7
      

      OpenWire

      round 0 sending message 0
      in closing thread
      round 0 sending message 1
      [Thread-1 (activemq-netty-threads)] 07:17:20,887 WARN  [org.apache.activemq.artemis.core.server] AMQ222183: Blocking message production on address 'testaddress'; size is currently: 4,092 bytes; max-size-bytes on address: 1,024, global-max-size is 4,092
      going to close things
      [Thread-1 (activemq-netty-threads)] 07:17:25,866 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session 186872b7-4911-11e8-88d1-daf62bfd3641
      [Thread-1 (activemq-netty-threads)] 07:17:25,867 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 186872b7-4911-11e8-88d1-daf62bfd3641
      [Thread-1 (activemq-netty-threads)] 07:17:25,867 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session ID:localhost-37287-1524719822932-3:1:-1
      [Thread-1 (activemq-netty-threads)] 07:17:25,867 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session ID:localhost-37287-1524719822932-3:1:-1
      [Thread-1 (activemq-netty-threads)] 07:17:25,894 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session ID:localhost-37287-1524719822932-3:1:1
      [Thread-1 (activemq-netty-threads)] 07:17:25,895 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session ID:localhost-37287-1524719822932-3:1:1
      closed session
      round 1 sending message 0
      in closing thread
      going to close things
      [Thread-2 (activemq-netty-threads)] 07:17:30,924 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session 1b7b09ea-4911-11e8-88d1-daf62bfd3641
      [Thread-2 (activemq-netty-threads)] 07:17:30,924 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 1b7b09ea-4911-11e8-88d1-daf62bfd3641
      [Thread-2 (activemq-netty-threads)] 07:17:30,925 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session ID:localhost-37287-1524719822932-3:2:-1
      [Thread-2 (activemq-netty-threads)] 07:17:30,925 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session ID:localhost-37287-1524719822932-3:2:-1
      [Thread-2 (activemq-netty-threads)] 07:17:30,930 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session ID:localhost-37287-1524719822932-3:2:1
      [Thread-2 (activemq-netty-threads)] 07:17:30,930 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session ID:localhost-37287-1524719822932-3:2:1
      closed session
      round 2 sending message 0
      in closing thread
      going to close things
      [Thread-3 (activemq-netty-threads)] 07:17:35,964 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session 1e7cd83d-4911-11e8-88d1-daf62bfd3641
      [Thread-3 (activemq-netty-threads)] 07:17:35,964 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 1e7cd83d-4911-11e8-88d1-daf62bfd3641
      [Thread-3 (activemq-netty-threads)] 07:17:35,964 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session ID:localhost-37287-1524719822932-3:3:-1
      [Thread-3 (activemq-netty-threads)] 07:17:35,965 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session ID:localhost-37287-1524719822932-3:3:-1
      [Thread-3 (activemq-netty-threads)] 07:17:35,968 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session ID:localhost-37287-1524719822932-3:3:1
      [Thread-3 (activemq-netty-threads)] 07:17:35,969 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session ID:localhost-37287-1524719822932-3:3:1
      closed session
      [Thread-27 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$5@19b89d4)] 07:17:35,996 INFO  [org.apache.activemq.artemis.core.server] AMQ221046: Unblocking message production on address 'testaddress'; size is currently: 0 bytes; max-size-bytes: 1,024
      [Thread-4 (activemq-netty-threads)] 07:17:37,007 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session 217ad600-4911-11e8-88d1-daf62bfd3641
      [Thread-4 (activemq-netty-threads)] 07:17:37,007 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 217ad600-4911-11e8-88d1-daf62bfd3641
      [Thread-4 (activemq-netty-threads)] 07:17:37,007 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session ID:localhost-37287-1524719822932-5:1:-1
      [Thread-4 (activemq-netty-threads)] 07:17:37,007 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session ID:localhost-37287-1524719822932-5:1:-1
      [main] 07:17:37,136 INFO  [org.apache.activemq.artemis.core.server] AMQ221002: Apache ActiveMQ Artemis Message Broker version 2.6.0-SNAPSHOT [184f4565-4911-11e8-88d1-daf62bfd3641] stopped, uptime 16.558 seconds
      [main] 07:17:37,138 INFO  [org.apache.activemq.artemis.core.server] **** end #test testBlockedProducerIsUnableToGetMessagesOntoAQueue[OPENWIRE]() ***
      
      java.lang.AssertionError: received is 4
      

      A log from dTests (for openwire) test may look like this

      [...]
      
      [...]bcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659370394, 'content-type': None, 'id': 'nixos-43937-1524659370032-1:1:1:1:4', 'reply-to': None, 'properties': {}}
      5111 / 0
      
      # sender 5111 filled address wit messages, the very next message will be blocked, OK
      
      5136 + java -jar cli-activemq/target/cli-activemq-1.2.2-SNAPSHOT-LATEST.jar sender --log-msgs dict --msg-content-from-file /tmp/10368200510833594280.tmp --address unknowntestNW8BGQo
      5136 / 143
      
      # sender 5136 was killed on timeout, because it was bloked, OK
      
      5697 + java -jar cli-activemq/target/cli-activemq-1.2.2-SNAPSHOT-LATEST.jar receiver --log-msgs dict --count 0 --timeout 15 --address unknowntestNW8BGQo
      5699 + java -jar cli-activemq/target/cli-activemq-1.2.2-SNAPSHOT-LATEST.jar sender --log-msgs dict --msg-content-from-file /tmp/2674667166147590523.tmp --address unknowntestNW8BGQo
      
      5697   {'address': 'unknowntestNW8BGQo', 'group-id': None, 'subject': None, 'user-id': None, 'correlation-id': None, 'content-encoding': None, 'priority': 4, 'type': None, 'ttl': 0, 'absolute-expiry-time': 0, 'content': 'abcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659370324, 'content-type': None, 'id': 'nixos-43937-1524659370032-1:1:1:1:1', 'reply-to': None, 'properties': {}}
      5697   {'address': 'unknowntestNW8BGQo', 'group-id': None, 'subject': None, 'user-id': None, 'correlation-id': None, 'content-encoding': None, 'priority': 4, 'type': None, 'ttl': 0, 'absolute-expiry-time': 0, 'content': 'abcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659370356, 'content-type': None, 'id': 'nixos-43937-1524659370032-1:1:1:1:2', 'reply-to': None, 'properties': {}}
      5697   {'address': 'unknowntestNW8BGQo', 'group-id': None, 'subject': None, 'user-id': None, 'correlation-id': None, 'content-encoding': None, 'priority': 4, 'type': None, 'ttl': 0, 'absolute-expiry-time': 0, 'content': 'abcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659370379, 'content-type': None, 'id': 'nixos-43937-1524659370032-1:1:1:1:3', 'reply-to': None, 'properties': {}}
      5697   {'address': 'unknowntestNW8BGQo', 'group-id': None, 'subject': None, 'user-id': None, 'correlation-id': None, 'content-encoding': None, 'priority': 4, 'type': None, 'ttl': 0, 'absolute-expiry-time': 0, 'content': 'abcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659370394, 'content-type': None, 'id': 'nixos-43937-1524659370032-1:1:1:1:4', 'reply-to': None, 'properties': {}}
      5697   {'address': 'unknowntestNW8BGQo', 'group-id': None, 'subject': None, 'user-id': None, 'correlation-id': None, 'content-encoding': None, 'priority': 4, 'type': None, 'ttl': 0, 'absolute-expiry-time': 0, 'content': 'abcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659555122, 'content-type': None, 'id': 'nixos-44013-1524659554940-1:1:1:1:1', 'reply-to': None, 'properties': {}}
      5697   {'address': 'unknowntestNW8BGQo', 'group-id': None, 'subject': None, 'user-id': None, 'correlation-id': None, 'content-encoding': None, 'priority': 4, 'type': None, 'ttl': 0, 'absolute-expiry-time': 0, 'content': 'abcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659617039, 'content-type': None, 'id': 'nixos-38941-1524659616714-1:1:1:1:1', 'reply-to': None, 'properties': {}}
      
      # receiver 5697 received message 'id': 'nixos-44013-1524659554940-1:1:1:1:1', that came from producer 5699, BUG
      # message nixos-38941-1524659616714-1:1:1:1:1 is from sender 5699 which got unblocked by receiving from previously full address, OK
      
      5699   {'address': 'unknowntestNW8BGQo', 'group-id': None, 'subject': None, 'user-id': None, 'correlation-id': None, 'content-encoding': None, 'priority': 4, 'type': None, 'ttl': 0, 'absolute-expiry-time': 0, 'content': 'abcdefghijklmnopqrstuvwxyzpayloadABCDEFGHIJKL', 'redelivered': False, 'reply-to-group-id': None, 'durable': True, 'group-sequence': 0, 'creation-time': 1524659617039, 'content-type': None, 'id': 'nixos-38941-1524659616714-1:1:1:1:1', 'reply-to': None, 'properties': {}}
      
      # sender 5699 prints the message it sent (this is multithreaded/multiprocess, so some reordering is possible, like here, received first, send later)
      
      5699 / 0
      5697 / 0
      
      # remaining two clis exitted
      

      Attachments

        Issue Links

          Activity

            People

              rh-ee-ataylor Andy Taylor
              jdanek@redhat.com Jiri Daněk
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: