Index: b/extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusIntegerationTest.java new file mode 100644 =================================================================== --- /dev/null (revision ) +++ b/extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusIntegerationTest.java (working copy) @@ -0,0 +1,338 @@ +/* + * ModeShape (http://www.modeshape.org) + * See the COPYRIGHT.txt file distributed with this work for information + * regarding copyright ownership. Some portions may be licensed + * to Red Hat, Inc. under one or more contributor license agreements. + * See the AUTHORS.txt file in the distribution for a full listing of + * individual contributors. + * + * ModeShape is free software. Unless otherwise indicated, all code in ModeShape + * is licensed to you under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * ModeShape is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.modeshape.clustering; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.modeshape.graph.ExecutionContext; +import org.modeshape.graph.Location; +import org.modeshape.graph.observe.Changes; +import org.modeshape.graph.observe.Observer; +import org.modeshape.graph.property.DateTime; +import org.modeshape.graph.property.Name; +import org.modeshape.graph.property.Path; +import org.modeshape.graph.request.ChangeRequest; +import org.modeshape.graph.request.CreateNodeRequest; + +public class ClusteredObservationBusIntegerationTest { + + private ExecutionContext context = new ExecutionContext(); + + @Test + public void shouldProperlySendChangesThroughRealJGroupsCluster() throws Exception { + + // Create three observers ... + CustomObserver observer1 = new CustomObserver(); + CustomObserver observer2 = new CustomObserver(); + CustomObserver observer3 = new CustomObserver(); + + // Create three busses using a real JGroups cluster ... + String name = "MyCluster"; + ClusteredObservationBus bus1 = startNewBus(name, observer1); + try { + // ------------------------------------ + // Send a change from the first bus ... + // ------------------------------------ + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(0); // shutdown + observer3.expectChanges(0); // shutdown + + // Send changes to one of the busses ... + Changes changes = changes(); + bus1.notify(changes); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(0)); + assertThat(observer3.getObservedChanges().size(), is(0)); + assertThat(observer1.getObservedChanges().get(0), is(changes)); + + // ------------------------------------ + // Create a second bus ... + // ------------------------------------ + ClusteredObservationBus bus2 = startNewBus(name, observer2); + try { + // ------------------------------------ + // Send a change from the first bus ... + // ------------------------------------ + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(1); + observer3.expectChanges(0); // shutdown + + // Send changes to one of the busses ... + changes = changes(); + bus1.notify(changes); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(1)); + assertThat(observer3.getObservedChanges().size(), is(0)); + assertThat(observer1.getObservedChanges().get(0), is(changes)); + assertThat(observer2.getObservedChanges().get(0), is(changes)); + + // ------------------------------------ + // Send a change from the second bus ... + // ------------------------------------ + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(1); + observer3.expectChanges(0); // shutdown + + // Send changes to one of the busses ... + changes = changes(); + bus2.notify(changes); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(1)); + assertThat(observer3.getObservedChanges().size(), is(0)); + assertThat(observer1.getObservedChanges().get(0), is(changes)); + assertThat(observer2.getObservedChanges().get(0), is(changes)); + + // ------------------------------------ + // Create a second bus ... + // ------------------------------------ + ClusteredObservationBus bus3 = startNewBus(name, observer3); + try { + + // ------------------------------------ + // Send a change from the first bus ... + // ------------------------------------ + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(1); + observer3.expectChanges(1); + + // Send changes to one of the busses ... + changes = changes(); + bus1.notify(changes); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(1)); + assertThat(observer3.getObservedChanges().size(), is(1)); + assertThat(observer1.getObservedChanges().get(0), is(changes)); + assertThat(observer2.getObservedChanges().get(0), is(changes)); + assertThat(observer3.getObservedChanges().get(0), is(changes)); + + // ------------------------------------- + // Send a change from the second bus ... + // ------------------------------------- + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(1); + observer3.expectChanges(1); + + // Send changes to one of the busses ... + Changes changes2 = changes(); + bus2.notify(changes2); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(1)); + assertThat(observer3.getObservedChanges().size(), is(1)); + assertThat(observer1.getObservedChanges().get(0), is(changes2)); + assertThat(observer2.getObservedChanges().get(0), is(changes2)); + assertThat(observer3.getObservedChanges().get(0), is(changes2)); + + // ------------------------------------ + // Send a change from the third bus ... + // ------------------------------------ + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(1); + observer3.expectChanges(1); + + // Send changes to one of the busses ... + Changes changes3 = changes(); + bus3.notify(changes3); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(1)); + assertThat(observer3.getObservedChanges().size(), is(1)); + assertThat(observer1.getObservedChanges().get(0), is(changes3)); + assertThat(observer2.getObservedChanges().get(0), is(changes3)); + assertThat(observer3.getObservedChanges().get(0), is(changes3)); + + // --------------------------------------- + // Stop the busses! I want to get off! ... + // --------------------------------------- + } finally { + bus3.shutdown(); + } + } finally { + // ------------------------------------ + // Send a change from the second bus ... + // ------------------------------------ + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(1); + observer3.expectChanges(0); // shutdown + + // Send changes to one of the busses ... + changes = changes(); + bus2.notify(changes); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(1)); + assertThat(observer3.getObservedChanges().size(), is(0)); + assertThat(observer1.getObservedChanges().get(0), is(changes)); + assertThat(observer2.getObservedChanges().get(0), is(changes)); + + bus2.shutdown(); + } + } finally { + // ------------------------------------ + // Send a change from the first bus ... + // ------------------------------------ + + // Set the observers to expect one event ... + observer1.expectChanges(1); + observer2.expectChanges(0); // shutdown + observer3.expectChanges(0); // shutdown + + // Send changes to one of the busses ... + Changes changes = changes(); + bus1.notify(changes); + + // Wait for the observers to be notified ... + observer1.await(); + observer2.await(); + observer3.await(); + + // Now verify that all of the observers received the notification ... + assertThat(observer1.getObservedChanges().size(), is(1)); + assertThat(observer2.getObservedChanges().size(), is(0)); + assertThat(observer3.getObservedChanges().size(), is(0)); + assertThat(observer1.getObservedChanges().get(0), is(changes)); + + bus1.shutdown(); + } + } + + // ---------------------------------------------------------------------------------------------------------------- + // Utility methods + // ---------------------------------------------------------------------------------------------------------------- + + protected ClusteredObservationBus startNewBus( String name, + Observer localObserver ) { + ClusteredObservationBus bus = new ClusteredObservationBus(); + bus.setClusterName(name); + bus.start(); + bus.register(localObserver); + return bus; + } + + protected Changes changes() { + DateTime now = context.getValueFactories().getDateFactory().create(); + Path path = context.getValueFactories().getPathFactory().create("/a"); + Name childName = context.getValueFactories().getNameFactory().create("b"); + Path childPath = context.getValueFactories().getPathFactory().create(path, childName); + CreateNodeRequest request = new CreateNodeRequest(Location.create(path), "workspaceName", childName); + request.setActualLocationOfNode(Location.create(childPath)); + List requests = Collections.singletonList((ChangeRequest)request); + return new Changes("processId", "contextId", "username", "sourceName", now, requests, null); + } + + protected static class CustomObserver implements Observer { + private final List receivedChanges = new ArrayList(); + private CountDownLatch latch; + + public void expectChanges( int expectedNumberOfChanges ) { + latch = new CountDownLatch(expectedNumberOfChanges); + receivedChanges.clear(); + } + + /** + * {@inheritDoc} + * + * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) + */ + @Override + public void notify( Changes changes ) { + receivedChanges.add(changes); + latch.countDown(); + } + + public void await() throws InterruptedException { + latch.await(250, TimeUnit.MILLISECONDS); + } + + public List getObservedChanges() { + return receivedChanges; + } + } +} Index: b/extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java =================================================================== --- a/extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java (revision ) +++ b/extensions/modeshape-clustering/src/test/java/org/modeshape/clustering/ClusteredObservationBusTest.java (working copy) @@ -30,18 +30,19 @@ import static org.hamcrest.core.IsSame.sameInstance; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.stub; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.Vector; +import org.jgroups.Address; import org.jgroups.ChannelListener; import org.jgroups.JChannel; import org.jgroups.Message; import org.jgroups.Receiver; +import org.jgroups.View; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -61,12 +62,27 @@ public class ClusteredObservationBusTest { private ClusteredObservationBus bus; private ExecutionContext context = new ExecutionContext(); protected JChannel mockChannel; + protected View mockView; + protected Vector
channelMembers; @Before public void beforeEach() { - mockChannel = Mockito.mock(JChannel.class); - // Create a clustered bus that does NOT use a real JChannel ... - bus = newBus(mockChannel); + // Now create a mock JChannel and our bus ... + final JChannel mockChannel = Mockito.mock(JChannel.class); + this.mockChannel = mockChannel; + this.bus = new ClusteredObservationBus() { + @Override + protected JChannel newChannel( String configuration ) { + return mockChannel; + } + }; + // Create a mocked view; the only thing ClusteredObservationBus uses it for is to + // call 'channel.getView().getMembers().size()', so mock these objects so that the + // size is 'numberOfMembers' + channelMembers = new Vector
(); + mockView = Mockito.mock(View.class); + stub(mockChannel.getView()).toReturn(mockView); + stub(mockView.getMembers()).toReturn(channelMembers); } @Test @@ -178,16 +194,19 @@ public class ClusteredObservationBusTest { bus.start(); verify(mockChannel, times(1)).addChannelListener(ArgumentCaptor.forClass(ChannelListener.class).capture()); verify(mockChannel, times(1)).connect("clusterName"); - verify(mockChannel, times(1)).setReceiver(ArgumentCaptor.forClass(Receiver.class).capture()); + + // Get the bus' receiver so we can set the view ... + ArgumentCaptor receiverArgument = ArgumentCaptor.forClass(Receiver.class); + verify(mockChannel, times(1)).setReceiver(receiverArgument.capture()); + + // Pretend that the channel has two members ... + setChannelMemberCount(2); + receiverArgument.getValue().viewAccepted(mockView); // When connected, JGroups will call back to the listener and the bus will record it as open. // But we have to do this manually because we've stubbed out JGroups ... bus.isOpen.set(true); - // JGroups also normally calls Receiver.viewAccepted(...), and the bus' receiver sets whether there are - // multiple members in the cluster. We need to set this manually because we've stubbed out JGroups ... - bus.multipleAddressesInCluster.set(true); - // Now call the notify method ... bus.notify(changes()); verify(mockChannel, times(1)).send(ArgumentCaptor.forClass(Message.class).capture()); @@ -197,20 +216,26 @@ public class ClusteredObservationBusTest { @Test public void shouldAllowNotifyToBeCalledAfterStartWithOneMemberAndShouldSendMessageToLocalObserversBuNotJGroups() throws Exception { + // Pretend that the channel has two members ... + setChannelMemberCount(1); + bus.setClusterName("clusterName"); bus.start(); verify(mockChannel, times(1)).addChannelListener(ArgumentCaptor.forClass(ChannelListener.class).capture()); verify(mockChannel, times(1)).connect("clusterName"); - verify(mockChannel, times(1)).setReceiver(ArgumentCaptor.forClass(Receiver.class).capture()); + + // Get the bus' receiver so we can set the view ... + ArgumentCaptor receiverArgument = ArgumentCaptor.forClass(Receiver.class); + verify(mockChannel, times(1)).setReceiver(receiverArgument.capture()); + + // Pretend that the channel has only one member ... + setChannelMemberCount(1); + receiverArgument.getValue().viewAccepted(mockView); // When connected, JGroups will call back to the listener and the bus will record it as open. // But we have to do this manually because we've stubbed out JGroups ... bus.isOpen.set(true); - // JGroups also normally calls Receiver.viewAccepted(...), and the bus' receiver sets whether there are - // multiple members in the cluster. We need to set this manually because we've stubbed out JGroups ... - bus.multipleAddressesInCluster.set(false); - // Add a local listener ... Observer observer = mock(Observer.class); bus.register(observer); @@ -224,57 +249,19 @@ public class ClusteredObservationBusTest { verify(observer, times(1)).notify(changes); } - @Test - public void shouldProperlySendChangesThroughRealJGroupsCluster() throws Exception { - - // Create three observers ... - CountDownLatch latch = new CountDownLatch(3); - CustomObserver observer1 = new CustomObserver(latch); - CustomObserver observer2 = new CustomObserver(latch); - CustomObserver observer3 = new CustomObserver(latch); - - // Create three busses using a real JGroups cluster ... - String name = "MyCluster"; - ClusteredObservationBus bus1 = startNewBus(name, observer1); - try { - ClusteredObservationBus bus2 = startNewBus(name, observer2); - try { - ClusteredObservationBus bus3 = startNewBus(name, observer3); - try { - - // Send changes to one of the busses ... - Changes changes = changes(); - bus1.notify(changes); - - // Wait for the observers to be notified ... - observer1.await(); - observer2.await(); - observer3.await(); - - // Now verify that all of the observers received the notification ... - assertThat(observer1.getObservedChanges().size(), is(1)); - assertThat(observer2.getObservedChanges().size(), is(1)); - assertThat(observer3.getObservedChanges().size(), is(1)); - assertThat(observer1.getObservedChanges().get(0), is(changes)); - assertThat(observer2.getObservedChanges().get(0), is(changes)); - assertThat(observer3.getObservedChanges().get(0), is(changes)); - - // Stop the busses ... - } finally { - bus3.shutdown(); - } - } finally { - bus2.shutdown(); - } - } finally { - bus1.shutdown(); - } - } - // ---------------------------------------------------------------------------------------------------------------- // Utility methods // ---------------------------------------------------------------------------------------------------------------- + protected void setChannelMemberCount( int count ) { + while (channelMembers.size() > count) { + channelMembers.remove(channelMembers.size() - 1); + } + while (channelMembers.size() < count) { + channelMembers.add(Mockito.mock(Address.class)); + } + } + protected void setAndGetClusterName( String name ) { bus.setClusterName(name); String nameAfter = bus.getClusterName(); @@ -287,30 +274,6 @@ public class ClusteredObservationBusTest { assertThat(configAfter, is(config)); } - protected ClusteredObservationBus startNewBus( String name, - Observer localObserver ) { - ClusteredObservationBus bus = newBus(null); - bus.setClusterName(name); - bus.start(); - bus.register(localObserver); - return bus; - } - - protected ClusteredObservationBus newBus( final JChannel channel ) { - return channel == null ? new ClusteredObservationBus() : new ClusteredObservationBus() { - /** - * {@inheritDoc} - * - * @see org.modeshape.clustering.ClusteredObservationBus#newChannel(java.lang.String) - */ - @Override - protected JChannel newChannel( String configuration ) { - return channel; - } - }; - - } - protected Changes changes() { DateTime now = context.getValueFactories().getDateFactory().create(); Path path = context.getValueFactories().getPathFactory().create("/a"); @@ -321,32 +284,4 @@ public class ClusteredObservationBusTest { List requests = Collections.singletonList((ChangeRequest)request); return new Changes("processId", "contextId", "username", "sourceName", now, requests, null); } - - protected static class CustomObserver implements Observer { - private final List receivedChanges = new ArrayList(); - private final CountDownLatch latch; - - protected CustomObserver( CountDownLatch latch ) { - this.latch = latch; - } - - /** - * {@inheritDoc} - * - * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes) - */ - @Override - public void notify( Changes changes ) { - receivedChanges.add(changes); - latch.countDown(); - } - - public void await() throws InterruptedException { - latch.await(250, TimeUnit.MILLISECONDS); - } - - public List getObservedChanges() { - return receivedChanges; - } - } }