FUSE Message Broker
  1. FUSE Message Broker
  2. MB-381

Consumed messages re-appearing in the broker

    Details

    • Type: Bug Bug
    • Status: Resolved Resolved
    • Priority: Blocker Blocker
    • Resolution: Done
    • Affects Version/s: 5.1.0.0-fuse
    • Fix Version/s: 5.0.0.20-fuse
    • Component/s: broker
    • Labels:
      None
    • Environment:
      Tested on Windows, using JDK 1.6.0_07, but probably a problem on all platforms since it is a clear logic problem.
    • Similar Issues:
      Show 10 results 

      Description

      This is probably more of a problem with how the broker recovers from failures in conjunction with the database rather than a core broker failure.

      The attached project clearly shows the problem where messages which have been consumed may re-appear after the broker restarts from a kill -9, forced kill in Windows, or a stop in the eclipse environment.

      To run the test case in eclipse:

      1. Run the JUnit test: SpringActiveMQTest.java (Explanations are given in comments of the test case) and let it run about 1 minute. It will shut-down automatically for a graceful shut-down.
      2. Run the Java main class: SpringActiveMQ.java. Start jconsole from jdk6 on url: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi. As you can see, there is only 1 message on the queue, as expected from the test. Kill the broker with the stop button of eclipse.
      3. Again, run the Java main class: SpringActiveMQ.java. It will force ActiveMQ to recover. Start jconsole from jdk6 on url: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi. As you can see, there is 2 messages on the queue. One message re-pop. Explanation is given in comments of the junit test.

      This is due to the fact that the deletion of a message was stored in a data file that is archived, but the fact that the message was sent to the broker is stored in another data file that remains active. So when the broker restarts it doesn't realise that the message was already removed and adds it again.

        Activity

        Hide
        Martin Murphy
        added a comment -

        Here's a test case for this problem.

        Show
        Martin Murphy
        added a comment - Here's a test case for this problem.
        Hide
        Martin Murphy
        added a comment - - edited

        I tried setting syncOnWrite to true as well, but that doesn't seem to help either. It looks like this is a problem with how the Kaha store is setup when a message send is in one data log and it's consumption is in another.

        <persistenceAdapter>
          <amqPersistenceAdapter directory="${activemq.base}/activemq-data/amqstore" maxFileLength="4 mb" 
        archiveDataLogs="true" cleanupInterval="5000" syncOnWrite="true"/>
        </persistenceAdapter>
        

        It might help to use JDBC instead of the Kaha store:

        <persistenceAdapter>
          <jdbcPersistenceAdapter dataSource="#postgres-ds"/>
        </persistenceAdapter>
        
        <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
            <property name="serverName" value="localhost"/>
            <property name="databaseName" value="activemq"/>
            <property name="portNumber" value="0"/>
            <property name="user" value="activemq"/>
            <property name="password" value="activemq"/>
            <property name="dataSourceName" value="postgres"/>
            <property name="initialConnections" value="1"/>
            <property name="maxConnections" value="10"/>
        </bean>
        

        You can read more on this here

        Show
        Martin Murphy
        added a comment - - edited I tried setting syncOnWrite to true as well, but that doesn't seem to help either. It looks like this is a problem with how the Kaha store is setup when a message send is in one data log and it's consumption is in another. <persistenceAdapter> <amqPersistenceAdapter directory= "${activemq.base}/activemq-data/amqstore" maxFileLength= "4 mb" archiveDataLogs= "true" cleanupInterval= "5000" syncOnWrite= "true" /> </persistenceAdapter> It might help to use JDBC instead of the Kaha store: <persistenceAdapter> <jdbcPersistenceAdapter dataSource= "#postgres-ds" /> </persistenceAdapter> <bean id= "postgres-ds" class= "org.postgresql.ds.PGPoolingDataSource" > <property name= "serverName" value= "localhost" /> <property name= "databaseName" value= "activemq" /> <property name= "portNumber" value= "0" /> <property name= "user" value= "activemq" /> <property name= "password" value= "activemq" /> <property name= "dataSourceName" value= "postgres" /> <property name= "initialConnections" value= "1" /> <property name= "maxConnections" value= "10" /> </bean> You can read more on this here
        Hide
        Rob Davies
        added a comment -

        There are currently two choices for the current message broker - make recovery of the reference store optional - see
        https://issues.apache.org/activemq/browse/AMQ-1901

        or use the a journal with jdbc

        rob.davies@iona.com

        Show
        Rob Davies
        added a comment - There are currently two choices for the current message broker - make recovery of the reference store optional - see https://issues.apache.org/activemq/browse/AMQ-1901 or use the a journal with jdbc rob.davies@iona.com
        Hide
        Martin Murphy
        added a comment -

        If I read AMQ-1901 correctly :

        recoverReferenceStore - default = true - will recover if not valid
        forceRecoverReferenceStore - default force - will force a recovery

        Could setting these to false or not forcing a recovery could result in messages that were persisted to the reference store being lost?
        In short if you don't want to lose messages and do not want to encounter this bug you (consumed messages re-appearing) will have to use JDBC with the Journal?

        Show
        Martin Murphy
        added a comment - If I read AMQ-1901 correctly : recoverReferenceStore - default = true - will recover if not valid forceRecoverReferenceStore - default force - will force a recovery Could setting these to false or not forcing a recovery could result in messages that were persisted to the reference store being lost? In short if you don't want to lose messages and do not want to encounter this bug you (consumed messages re-appearing) will have to use JDBC with the Journal?
        Hide
        Rob Davies
        added a comment -

        The very, very worst that could happen is that the reference store is corrupted and fails on start - but if that is the case - restart the broker with recoverReferenceStore or forceRecoverReferenceStore set.

        Show
        Rob Davies
        added a comment - The very, very worst that could happen is that the reference store is corrupted and fails on start - but if that is the case - restart the broker with recoverReferenceStore or forceRecoverReferenceStore set.
        Hide
        Gary Tully
        added a comment -

        fix from https://issues.apache.org/activemq/browse/AMQ-1901 is on the fixes branch, r46943. will be in .19

        Show
        Gary Tully
        added a comment - fix from https://issues.apache.org/activemq/browse/AMQ-1901 is on the fixes branch, r46943. will be in .19
        Hide
        Martin Murphy
        added a comment -

        I think there may still be a problem with this fix. I compiled the trunk and tested it on my side. I would appreciate feedback on my findings.

        I repeated the steps of the demo as described above, but this time I had persistence adapter configured not to recover the reference store.

            <persistenceAdapter>
              <amqPersistenceAdapter directory="${activemq.base}/activemq-data/amqstore" maxFileLength="4 mb" archiveDataLogs="true" cleanupInterval="5000" syncOnWrite="true" recoverReferenceStore="false"/>
            </persistenceAdapter>
        

        I found that the consumed message that was re-appearing no longer re-appears, so all seems good. However, in step 3 (when the broker, SpringActiveMQ, restarts), I can see that the non-consumed message seems to be removed:

        20-Aug-2008 14:54:08 org.apache.activemq.broker.BrokerService start
        INFO: ActiveMQ JMS Message Broker (momr2, ID:csp690-2997-1219240447748-0:0) started
        20-Aug-2008 14:54:12 org.apache.activemq.kaha.impl.async.AsyncDataManager forceRemoveDataFile
        INFO: moved data file data-1 number = 1 , length = 4097882 refCount = 0 to C:\users\mmurphy\customers\OSS\DEV-1047\MB-381\.\target\activemq-data\amqstore\archive

        This leads to a problem where as suspected the reference store is corrupted :

        20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.KahaStore initialize
        INFO: Kaha Store using data directory C:\users\mmurphy\customers\OSS\AllianceHealthcare\DEV-1047\MB-381\.\target\activemq-data\amqstore\kr-store\data
        20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.async.AsyncDataManager getDataFile
        SEVERE: Looking for key 1 but not found in fileMap:

        Unknown macro: {4=data-4 number = 4 , length = 3074491 refCount = 3, 3=data-3 number = 3 , length = 4098264 refCount = 0}

        20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService start
        SEVERE: Failed to start ActiveMQ JMS Message Broker. Reason: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
        java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
        at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
        at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643)
        at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529)
        at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432)
        at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
        at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478)
        at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165)
        at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
        at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
        at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
        at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
        at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
        at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
        at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
        at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
        at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
        at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
        at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466)
        at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68)
        at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
        at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13)
        Caused by: java.io.IOException: Could not locate data file data-1
        at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306)
        at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616)
        at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526)
        ... 58 more
        20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop
        INFO: ActiveMQ Message Broker (momr2, null) is shutting down
        20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop
        INFO: Connector vm Stopped
        20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop
        INFO: Connector tcp Stopped
        20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop
        INFO: ActiveMQ JMS Message Broker (momr2, null) stopped
        20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons
        INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1112783: defining beans org.springframework.beans.factory.config.PropertyPlaceholderConfigurer#0,org.apache.activemq.xbean.XBeanBrokerService#0; parent: org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9
        20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons
        INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9: defining beans [broker]; root of factory hierarchy
        org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'broker' defined in class path resource [com/iona/fuse/dev1047/applicationContext-main.xml]: Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml]: Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
        at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83)
        at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13)
        Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml]: Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
        at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72)
        at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68)
        at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
        ... 15 more
        Caused by: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1
        at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
        at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643)
        at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529)
        at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432)
        at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
        at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
        at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478)
        at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165)
        at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
        at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
        at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
        at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
        at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
        at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
        at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
        at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
        at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
        at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
        at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
        at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466)
        at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333)
        ... 34 more
        Caused by: java.io.IOException: Could not locate data file data-1
        at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306)
        at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616)
        at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526)
        ... 58 more

        So, to recover from this I reset the persistence adapter to recover, I tried forceRecoverReferenceStore="true" with recoverReferenceStore="true" and recoverReferenceStore="false".

            <persistenceAdapter>
              <amqPersistenceAdapter directory="${activemq.base}/activemq-data/amqstore" maxFileLength="4 mb" archiveDataLogs="true" cleanupInterval="5000" syncOnWrite="true" recoverReferenceStore="true" forceRecoverReferenceStore="true"/>
            </persistenceAdapter>
        

        The problem is though, when it does recover the message that was not consumed is no longer there and the queue size is 0. It appears the message that was not consumed original is lost

        (Note each time I stop the broker, I use the stop (red button) in eclipse)

        Show
        Martin Murphy
        added a comment - I think there may still be a problem with this fix. I compiled the trunk and tested it on my side. I would appreciate feedback on my findings. I repeated the steps of the demo as described above, but this time I had persistence adapter configured not to recover the reference store. <persistenceAdapter> <amqPersistenceAdapter directory= "${activemq.base}/activemq-data/amqstore" maxFileLength= "4 mb" archiveDataLogs= "true" cleanupInterval= "5000" syncOnWrite= "true" recoverReferenceStore= "false" /> </persistenceAdapter> I found that the consumed message that was re-appearing no longer re-appears, so all seems good. However, in step 3 (when the broker, SpringActiveMQ, restarts), I can see that the non-consumed message seems to be removed: 20-Aug-2008 14:54:08 org.apache.activemq.broker.BrokerService start INFO: ActiveMQ JMS Message Broker (momr2, ID:csp690-2997-1219240447748-0:0) started 20-Aug-2008 14:54:12 org.apache.activemq.kaha.impl.async.AsyncDataManager forceRemoveDataFile INFO: moved data file data-1 number = 1 , length = 4097882 refCount = 0 to C:\users\mmurphy\customers\OSS\DEV-1047\ MB-381 \.\target\activemq-data\amqstore\archive This leads to a problem where as suspected the reference store is corrupted : 20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.KahaStore initialize INFO: Kaha Store using data directory C:\users\mmurphy\customers\OSS\AllianceHealthcare\DEV-1047\ MB-381 \.\target\activemq-data\amqstore\kr-store\data 20-Aug-2008 14:55:25 org.apache.activemq.kaha.impl.async.AsyncDataManager getDataFile SEVERE: Looking for key 1 but not found in fileMap: Unknown macro: {4=data-4 number = 4 , length = 3074491 refCount = 3, 3=data-3 number = 3 , length = 4098264 refCount = 0} 20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService start SEVERE: Failed to start ActiveMQ JMS Message Broker. Reason: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33) at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529) at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432) at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93) at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478) at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165) at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83) at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434) at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56) at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120) at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149) at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94) at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176) at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103) at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112) at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154) at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161) at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466) at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68) at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83) at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13) Caused by: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306) at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526) ... 58 more 20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop INFO: ActiveMQ Message Broker (momr2, null) is shutting down 20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop INFO: Connector vm Stopped 20-Aug-2008 14:55:25 org.apache.activemq.broker.TransportConnector stop INFO: Connector tcp Stopped 20-Aug-2008 14:55:25 org.apache.activemq.broker.BrokerService stop INFO: ActiveMQ JMS Message Broker (momr2, null) stopped 20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@1112783: defining beans org.springframework.beans.factory.config.PropertyPlaceholderConfigurer#0,org.apache.activemq.xbean.XBeanBrokerService#0 ; parent: org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9 20-Aug-2008 14:55:25 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6f7ce9: defining beans [broker] ; root of factory hierarchy org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'broker' defined in class path resource [com/iona/fuse/dev1047/applicationContext-main.xml] : Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml] : Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:423) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:83) at com.iona.fuse.dev1047.SpringActiveMQ.main(SpringActiveMQ.java:13) Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [com/iona/fuse/dev1047/activemq.xml] : Invocation of init method failed; nested exception is java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1336) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:471) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409) at java.security.AccessController.doPrivileged(Native Method) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:220) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:88) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:76) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:72) at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:68) at org.apache.activemq.xbean.BrokerFactoryBean.afterPropertiesSet(BrokerFactoryBean.java:85) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) ... 15 more Caused by: java.io.IOException: Failed to read to journal for: offset = 1024738, file = 1, size = -1, type = 0. Reason: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33) at org.apache.activemq.store.amq.AMQPersistenceAdapter.createReadException(AMQPersistenceAdapter.java:643) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:529) at org.apache.activemq.store.amq.AMQMessageStore.getMessage(AMQMessageStore.java:432) at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:54) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82) at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93) at org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:478) at org.apache.activemq.broker.region.Queue.initialize(Queue.java:165) at org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83) at org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434) at org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56) at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120) at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142) at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149) at org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94) at org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176) at org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103) at org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112) at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154) at org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161) at org.apache.activemq.broker.BrokerService.start(BrokerService.java:466) at org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1367) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1333) ... 34 more Caused by: java.io.IOException: Could not locate data file data-1 at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:306) at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:616) at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:526) ... 58 more So, to recover from this I reset the persistence adapter to recover, I tried forceRecoverReferenceStore="true" with recoverReferenceStore="true" and recoverReferenceStore="false". <persistenceAdapter> <amqPersistenceAdapter directory= "${activemq.base}/activemq-data/amqstore" maxFileLength= "4 mb" archiveDataLogs= "true" cleanupInterval= "5000" syncOnWrite= "true" recoverReferenceStore= "true" forceRecoverReferenceStore= "true" /> </persistenceAdapter> The problem is though, when it does recover the message that was not consumed is no longer there and the queue size is 0. It appears the message that was not consumed original is lost (Note each time I stop the broker, I use the stop (red button) in eclipse)
        Hide
        Gary Tully
        added a comment -

        fix for : https://issues.apache.org/activemq/browse/AMQ-1926 seems relevant. Test case behaves differently. I need to investigate a bit

        Show
        Gary Tully
        added a comment - fix for : https://issues.apache.org/activemq/browse/AMQ-1926 seems relevant. Test case behaves differently. I need to investigate a bit
        Hide
        Gary Tully
        added a comment -

        have run the test case with trunk and it looks like amq-1926 has killed this bird also.
        possibly build the fuse-5.0 branch (fix is now there also) and see if it is reproducible. It may be that the test case behaves a little differently now but it may also be that the bug is fixed. The data files are managed a little better now so deletion is more controlled.

        Show
        Gary Tully
        added a comment - have run the test case with trunk and it looks like amq-1926 has killed this bird also. possibly build the fuse-5.0 branch (fix is now there also) and see if it is reproducible. It may be that the test case behaves a little differently now but it may also be that the bug is fixed. The data files are managed a little better now so deletion is more controlled.

          People

          • Assignee:
            Gary Tully
            Reporter:
            Martin Murphy
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Due:
              Created:
              Updated:
              Resolved: