Uploaded image for project: 'FUSE Message Broker'
  1. FUSE Message Broker
  2. MB-381

Consumed messages re-appearing in the broker

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Done
    • Affects Version/s: 5.1.0.0-fuse
    • Fix Version/s: 5.0.0.20-fuse
    • Component/s: broker
    • Labels:
      None
    • Environment:

      Tested on Windows, using JDK 1.6.0_07, but probably a problem on all platforms since it is a clear logic problem.

      Description

      This is probably more of a problem with how the broker recovers from failures in conjunction with the database rather than a core broker failure.

      The attached project clearly shows the problem where messages which have been consumed may re-appear after the broker restarts from a kill -9, forced kill in Windows, or a stop in the eclipse environment.

      To run the test case in eclipse:

      1. Run the JUnit test: SpringActiveMQTest.java (Explanations are given in comments of the test case) and let it run about 1 minute. It will shut-down automatically for a graceful shut-down.
      2. Run the Java main class: SpringActiveMQ.java. Start jconsole from jdk6 on url: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi. As you can see, there is only 1 message on the queue, as expected from the test. Kill the broker with the stop button of eclipse.
      3. Again, run the Java main class: SpringActiveMQ.java. It will force ActiveMQ to recover. Start jconsole from jdk6 on url: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi. As you can see, there is 2 messages on the queue. One message re-pop. Explanation is given in comments of the junit test.

      This is due to the fact that the deletion of a message was stored in a data file that is archived, but the fact that the message was sent to the broker is stored in another data file that remains active. So when the broker restarts it doesn't realise that the message was already removed and adds it again.

        Gliffy Diagrams

          Activity

          Hide
          martinmurphy Martin Murphy added a comment -

          Here's a test case for this problem.

          Show
          martinmurphy Martin Murphy added a comment - Here's a test case for this problem.
          Hide
          martinmurphy Martin Murphy added a comment - - edited

          I tried setting syncOnWrite to true as well, but that doesn't seem to help either. It looks like this is a problem with how the Kaha store is setup when a message send is in one data log and it's consumption is in another.

          <persistenceAdapter>
            <amqPersistenceAdapter directory="${activemq.base}/activemq-data/amqstore" maxFileLength="4 mb" 
          archiveDataLogs="true" cleanupInterval="5000" syncOnWrite="true"/>
          </persistenceAdapter>

          It might help to use JDBC instead of the Kaha store:

          <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#postgres-ds"/>
          </persistenceAdapter>
           
          <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
              <property name="serverName" value="localhost"/>
              <property name="databaseName" value="activemq"/>
              <property name="portNumber" value="0"/>
              <property name="user" value="activemq"/>
              <property name="password" value="activemq"/>
              <property name="dataSourceName" value="postgres"/>
              <property name="initialConnections" value="1"/>
              <property name="maxConnections" value="10"/>
          </bean>

          You can read more on this here

          Show
          martinmurphy Martin Murphy added a comment - - edited I tried setting syncOnWrite to true as well, but that doesn't seem to help either. It looks like this is a problem with how the Kaha store is setup when a message send is in one data log and it's consumption is in another. < persistenceAdapter > < amqPersistenceAdapter directory = "${activemq.base}/activemq-data/amqstore" maxFileLength = "4 mb" archiveDataLogs = "true" cleanupInterval = "5000" syncOnWrite = "true" /> </ persistenceAdapter > It might help to use JDBC instead of the Kaha store: < persistenceAdapter > < jdbcPersistenceAdapter dataSource = "#postgres-ds" /> </ persistenceAdapter >   < bean id = "postgres-ds" class = "org.postgresql.ds.PGPoolingDataSource" > < property name = "serverName" value = "localhost" /> < property name = "databaseName" value = "activemq" /> < property name = "portNumber" value = "0" /> < property name = "user" value = "activemq" /> < property name = "password" value = "activemq" /> < property name = "dataSourceName" value = "postgres" /> < property name = "initialConnections" value = "1" /> < property name = "maxConnections" value = "10" /> </ bean > You can read more on this here
          Hide
          rajdavies Rob Davies added a comment -

          There are currently two choices for the current message broker - make recovery of the reference store optional - see
          https://issues.apache.org/activemq/browse/AMQ-1901

          or use the a journal with jdbc

          rob.davies@iona.com

          Show
          rajdavies Rob Davies added a comment - There are currently two choices for the current message broker - make recovery of the reference store optional - see https://issues.apache.org/activemq/browse/AMQ-1901 or use the a journal with jdbc rob.davies@iona.com
          Hide
          martinmurphy Martin Murphy added a comment -

          If I read AMQ-1901 correctly :

          recoverReferenceStore - default = true - will recover if not valid
          forceRecoverReferenceStore - default force - will force a recovery

          Could setting these to false or not forcing a recovery could result in messages that were persisted to the reference store being lost?
          In short if you don't want to lose messages and do not want to encounter this bug you (consumed messages re-appearing) will have to use JDBC with the Journal?

          Show
          martinmurphy Martin Murphy added a comment - If I read AMQ-1901 correctly : recoverReferenceStore - default = true - will recover if not valid forceRecoverReferenceStore - default force - will force a recovery Could setting these to false or not forcing a recovery could result in messages that were persisted to the reference store being lost? In short if you don't want to lose messages and do not want to encounter this bug you (consumed messages re-appearing) will have to use JDBC with the Journal?
          Hide
          rajdavies Rob Davies added a comment -

          The very, very worst that could happen is that the reference store is corrupted and fails on start - but if that is the case - restart the broker with recoverReferenceStore or forceRecoverReferenceStore set.

          Show
          rajdavies Rob Davies added a comment - The very, very worst that could happen is that the reference store is corrupted and fails on start - but if that is the case - restart the broker with recoverReferenceStore or forceRecoverReferenceStore set.
          Hide
          garytully Gary Tully added a comment -

          fix from https://issues.apache.org/activemq/browse/AMQ-1901 is on the fixes branch, r46943. will be in .19

          Show
          garytully Gary Tully added a comment - fix from https://issues.apache.org/activemq/browse/AMQ-1901 is on the fixes branch, r46943. will be in .19
          Hide
          martinmurphy Martin Murphy added a comment -

          I think there may still be a problem with this fix. I compiled the trunk and tested it on my side. I would appreciate feedback on my findings.

          I repeated the steps of the demo as described above, but this time I had persistence adapter configured not to recover the reference store.

              <persistenceAdapter>
                <amqPersistenceAdapter directory="${activemq.base}/activemq-data/amqstore" maxFileLength="4 mb" archiveDataLogs="true" cleanupInterval="5000" syncOnWrite="true" recoverReferenceStore="false"/>
              </persistenceAdapter>

          I found that the consumed message that was re-appearing no longer re-appears, so all seems good. However, in step 3 (when the broker, SpringActiveMQ, restarts), I can see that the non-consumed message seems to be removed:

          20-Aug-2008 14:54:08 org.apache.activemq.broker.BrokerService start
          INFO: ActiveMQ JMS Message Broker (momr2, ID:csp690-2997-1219240447748-0:0) started
          20-Aug-2008 14:54:12 org.apache.activemq.kaha.impl.async.AsyncDataManager forceRemoveDataFile
          INFO: moved data file data-1 number = 1 , length = 4097882 refCount = 0 to C:\users\mmurphy\customers\OSS\DEV-1047\MB-381\.\target\activemq-data\amqstore\archive

          This leads to a problem where as suspected the reference store is corrupted :

          20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.KahaStore initialize
          INFO: Kaha Store using data directory C:\users\mmurphy\customers\OSS\AllianceHealthcare\DEV-1047\MB-381\.\target\activemq-data\amqstore\kr-store\data
          20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.async.AsyncDataManager getDataFile
          SEVERE: Looking for key 1 but not found in fileMap:

          Unknown macro: {4=data-4 number = 4 , length = 3074491 refCount = 3, 3=data-3 number = 3 , length = 4098264 refCount = 0}

          20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService start
          SEVERE: Failed to start ActiveMQ JMS Message Broker. Reason: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
          java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
          at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
          at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643)
          at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529)
          at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432)
          at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54)
          at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
          at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
          at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478)
          at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165)
          at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
          at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
          at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
          at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
          at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
          at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
          at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
          at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
          at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
          at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
          at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
          at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
          at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
          at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
          at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466)
          at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
          at java.security.AccessController.doPrivileged(Native Method)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
          at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
          at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
          at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
          at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
          at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
          at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68)
          at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
          at java.security.AccessController.doPrivileged(Native Method)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
          at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
          at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
          at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
          at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423)
          at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
          at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
          at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
          at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
          at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13)
          Caused by: java.io.IOException: Could not locate data file data-1
          at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306)
          at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616)
          at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526)
          ... 58 more
          20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop
          INFO: ActiveMQ Message Broker (momr2, null) is shutting down
          20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop
          INFO: Connector vm Stopped
          20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop
          INFO: Connector tcp Stopped
          20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop
          INFO: ActiveMQ JMS Message Broker (momr2, null) stopped
          20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons
          INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1112783: defining beans org.springframework.beans.factory.config.PropertyPlaceholderConfigurer#0,org.apache.activemq.xbean.XBeanBrokerService#0; parent: org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9
          20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons
          INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9: defining beans [broker]; root of factory hierarchy
          org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'broker' defined in class path resource [com/iona/fuse/dev1047/applicationContext-main.xml]: Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml]: Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
          at java.security.AccessController.doPrivileged(Native Method)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
          at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
          at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
          at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
          at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423)
          at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
          at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
          at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
          at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
          at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13)
          Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml]: Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
          at java.security.AccessController.doPrivileged(Native Method)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
          at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
          at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
          at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
          at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
          at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
          at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
          at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72)
          at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68)
          at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
          ... 15 more
          Caused by: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
          at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
          at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643)
          at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529)
          at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432)
          at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54)
          at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
          at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
          at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478)
          at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165)
          at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
          at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
          at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
          at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
          at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
          at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
          at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
          at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
          at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
          at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
          at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
          at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
          at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
          at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
          at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466)
          at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
          at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
          ... 34 more
          Caused by: java.io.IOException: Could not locate data file data-1
          at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306)
          at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616)
          at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526)
          ... 58 more

          So, to recover from this I reset the persistence adapter to recover, I tried forceRecoverReferenceStore="true" with recoverReferenceStore="true" and recoverReferenceStore="false".

              <persistenceAdapter>
                <amqPersistenceAdapter directory="${activemq.base}/activemq-data/amqstore" maxFileLength="4 mb" archiveDataLogs="true" cleanupInterval="5000" syncOnWrite="true" recoverReferenceStore="true" forceRecoverReferenceStore="true"/>
              </persistenceAdapter>

          The problem is though, when it does recover the message that was not consumed is no longer there and the queue size is 0. It appears the message that was not consumed original is lost

          (Note each time I stop the broker, I use the stop (red button) in eclipse)

          Show
          martinmurphy Martin Murphy added a comment - I think there may still be a problem with this fix. I compiled the trunk and tested it on my side. I would appreciate feedback on my findings. I repeated the steps of the demo as described above, but this time I had persistence adapter configured not to recover the reference store. < persistenceAdapter > < amqPersistenceAdapter directory = "${activemq.base}/activemq-data/amqstore" maxFileLength = "4 mb" archiveDataLogs = "true" cleanupInterval = "5000" syncOnWrite = "true" recoverReferenceStore = "false" /> </ persistenceAdapter > I found that the consumed message that was re-appearing no longer re-appears, so all seems good. However, in step 3 (when the broker, SpringActiveMQ, restarts), I can see that the non-consumed message seems to be removed: 20-Aug-2008 14:54:08 org.apache.activemq.broker.BrokerService start INFO: ActiveMQ JMS Message Broker (momr2, ID:csp690-2997-1219240447748-0:0) started 20-Aug-2008 14:54:12 org.apache.activemq.kaha.impl.async.AsyncDataManager forceRemoveDataFile INFO: moved data file data-1 number = 1 , length = 4097882 refCount = 0 to C:\users\mmurphy\customers\OSS\DEV-1047\ MB-381 \.\target\activemq-data\amqstore\archive This leads to a problem where as suspected the reference store is corrupted : 20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.KahaStore initialize INFO: Kaha Store using data directory C:\users\mmurphy\customers\OSS\AllianceHealthcare\DEV-1047\ MB-381 \.\target\activemq-data\amqstore\kr-store\data 20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.async.AsyncDataManager getDataFile SEVERE: Looking for key 1 but not found in fileMap: Unknown macro: {4=data-4 number = 4 , length = 3074491 refCount = 3, 3=data-3 number = 3 , length = 4098264 refCount = 0} 20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService start SEVERE: Failed to start ActiveMQ JMS Message Broker. Reason: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33) at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529) at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432) at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93) at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478) at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165) at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83) at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434) at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56) at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120) at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149) at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94) at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176) at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103) at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112) at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154) at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161) at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466) at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68) at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83) at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13) Caused by: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306) at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526) ... 58 more 20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop INFO: ActiveMQ Message Broker (momr2, null) is shutting down 20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop INFO: Connector vm Stopped 20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop INFO: Connector tcp Stopped 20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop INFO: ActiveMQ JMS Message Broker (momr2, null) stopped 20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1112783: defining beans org.springframework.beans.factory.config.PropertyPlaceholderConfigurer#0,org.apache.activemq.xbean.XBeanBrokerService#0 ; parent: org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9 20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9: defining beans [broker] ; root of factory hierarchy org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'broker' defined in class path resource [com/iona/fuse/dev1047/applicationContext-main.xml] : Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml] : Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83) at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13) Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml] : Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68) at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) ... 15 more Caused by: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33) at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529) at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432) at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93) at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478) at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165) at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83) at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434) at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56) at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120) at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149) at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94) at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176) at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103) at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112) at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154) at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161) at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466) at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) ... 34 more Caused by: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306) at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526) ... 58 more So, to recover from this I reset the persistence adapter to recover, I tried forceRecoverReferenceStore="true" with recoverReferenceStore="true" and recoverReferenceStore="false". < persistenceAdapter > < amqPersistenceAdapter directory = "${activemq.base}/activemq-data/amqstore" maxFileLength = "4 mb" archiveDataLogs = "true" cleanupInterval = "5000" syncOnWrite = "true" recoverReferenceStore = "true" forceRecoverReferenceStore = "true" /> </ persistenceAdapter > The problem is though, when it does recover the message that was not consumed is no longer there and the queue size is 0. It appears the message that was not consumed original is lost (Note each time I stop the broker, I use the stop (red button) in eclipse)
          Hide
          garytully Gary Tully added a comment -

          fix for : https://issues.apache.org/activemq/browse/AMQ-1926 seems relevant. Test case behaves differently. I need to investigate a bit

          Show
          garytully Gary Tully added a comment - fix for : https://issues.apache.org/activemq/browse/AMQ-1926 seems relevant. Test case behaves differently. I need to investigate a bit
          Hide
          garytully Gary Tully added a comment -

          have run the test case with trunk and it looks like amq-1926 has killed this bird also.
          possibly build the fuse-5.0 branch (fix is now there also) and see if it is reproducible. It may be that the test case behaves a little differently now but it may also be that the bug is fixed. The data files are managed a little better now so deletion is more controlled.

          Show
          garytully Gary Tully added a comment - have run the test case with trunk and it looks like amq-1926 has killed this bird also. possibly build the fuse-5.0 branch (fix is now there also) and see if it is reproducible. It may be that the test case behaves a little differently now but it may also be that the bug is fixed. The data files are managed a little better now so deletion is more controlled.

            People

            • Assignee:
              garytully Gary Tully
              Reporter:
              martinmurphy Martin Murphy
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Due:
                Created:
                Updated:
                Resolved: