package org.jgroups.tests; import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.jgroups.Address; import org.jgroups.Channel; import org.jgroups.Event; import org.jgroups.Global; import org.jgroups.JChannel; import org.jgroups.MergeView; import org.jgroups.ReceiverAdapter; import org.jgroups.View; import org.jgroups.jmx.JmxConfigurator; import org.jgroups.logging.Log; import org.jgroups.protocols.PING; import org.jgroups.protocols.SHARED_LOOPBACK; import org.jgroups.protocols.UNICAST3; import org.jgroups.protocols.pbcast.GMS; import org.jgroups.protocols.pbcast.NAKACK2; import org.jgroups.protocols.pbcast.STABLE; import org.jgroups.stack.DiagnosticsHandler; import org.jgroups.stack.ProtocolStack; import org.jgroups.util.MutableDigest; import org.jgroups.util.SocketFactory; import org.jgroups.util.ThreadFactory; import org.jgroups.util.Util; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** * Tests a merge between asymmetric partitions where an isolated coord is still seen by others. * Example : {A,B,C,D} and {A}. * */ @Test(groups=Global.FUNCTIONAL,sequential=true) public class MergeTest4 { protected JChannel a,b,c,d; @BeforeMethod void setUp() throws Exception { a=createChannel("A"); b=createChannel("B"); c=createChannel("C"); d=createChannel("D"); } @AfterMethod void tearDown() throws Exception { for(JChannel ch: new JChannel[]{a,b,c,d}) { ProtocolStack stack=ch.getProtocolStack(); String cluster_name=ch.getClusterName(); stack.stopStack(cluster_name); stack.destroy(); } } public void testMergeWithAsymetricViewsCoordIsolated() { createPartition(a,b,c,d); // Isolate the coord Address coord = a.getView().getCreator(); System.out.println("Isolating coord: " + coord); List
members = new ArrayList(); members.add(coord); View coord_view=new View(coord, 4, members); System.out.println("coord_view: " + coord_view); Channel coord_channel = findChannel(coord); System.out.println("coord_channel: " + coord_channel.getAddress()); MutableDigest digest=new MutableDigest(coord_view.getMembersRaw()); NAKACK2 nakack=(NAKACK2)coord_channel.getProtocolStack().findProtocol(NAKACK2.class); digest.merge(nakack.getDigest(coord)); GMS gms=(GMS)coord_channel.getProtocolStack().findProtocol(GMS.class); gms.installView(coord_view, digest); System.out.println("gms.getView() " + gms.getView()); System.out.println("Views are:"); for(JChannel ch: Arrays.asList(a,b,c,d)) System.out.println(ch.getAddress() + ": " + ch.getView()); JChannel merge_leader=findChannel(coord); merge_leader.setReceiver(new MyReceiver()); System.out.println("merge_leader: " + merge_leader.getAddressAsString()); System.out.println("Injecting MERGE event into merge leader " + merge_leader.getAddress()); Map merge_views=new HashMap(4); merge_views.put(a.getAddress(), findChannel(a.getAddress()).getView()); merge_views.put(b.getAddress(), findChannel(b.getAddress()).getView()); merge_views.put(c.getAddress(), findChannel(c.getAddress()).getView()); merge_views.put(d.getAddress(), findChannel(d.getAddress()).getView()); gms=(GMS)merge_leader.getProtocolStack().findProtocol(GMS.class); gms.up(new Event(Event.MERGE, merge_views)); for(int i=0; i < 20; i++) { boolean done=true; System.out.println(); for(JChannel ch: new JChannel[]{a,b,c,d}) { System.out.println("==> " + ch.getAddress() + ": " + ch.getView()); if(ch.getView().size() != 4) done=false; } if(done) break; Util.sleep(1000); } System.out.println("Views are:"); for(JChannel ch: Arrays.asList(a,b,c,d)) System.out.println(ch.getAddress() + ": " + ch.getView()); } class MyReceiver extends ReceiverAdapter { @Override public void viewAccepted(View view) { if(view instanceof MergeView) { System.out.println("MyReceiver view " + view); Assert.assertEquals(view.size(), 4); Assert.assertEquals(((MergeView)view).getSubgroups().size(), 2); } } } protected JChannel createChannel(String name) throws Exception { JChannel retval=new JChannel(new SHARED_LOOPBACK(), new PING().setValue("timeout",100), new NAKACK2().setValue("use_mcast_xmit",false) .setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false), new UNICAST3(), new STABLE().setValue("max_bytes",50000), new GMS().setValue("print_local_addr",false) .setValue("leave_timeout",100) .setValue("merge_timeout",5000) .setValue("log_view_warnings",false) .setValue("view_ack_collection_timeout",50) .setValue("log_collect_msgs",false)) .name(name); retval.connect("MergeTest4"); JmxConfigurator.registerChannel(retval, Util.getMBeanServer(), name, retval.getClusterName(), true); return retval; } protected JChannel findChannel(Address mbr) { for(JChannel ch: Arrays.asList(a,b,c,d)) { if(ch.getAddress().equals(mbr)) return ch; } return null; } protected void createPartition(JChannel ... channels) { List members=getMembers(channels); Collections.sort(members); Address coord=members.get(0); View view=new View(coord, 0, members); MutableDigest digest=new MutableDigest(view.getMembersRaw()); for(JChannel ch: channels) { NAKACK2 nakack=(NAKACK2)ch.getProtocolStack().findProtocol(NAKACK2.class); digest.merge(nakack.getDigest(ch.getAddress())); } for(JChannel ch: channels) { GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class); gms.installView(view, digest); } } protected List getMembers(JChannel ... channels) { List members=new ArrayList(channels.length); for(JChannel ch: channels) members.add(ch.getAddress()); return members; } protected static class MyDiagnosticsHandler extends DiagnosticsHandler { protected MyDiagnosticsHandler(InetAddress diagnostics_addr, int diagnostics_port, Log log, SocketFactory socket_factory, ThreadFactory thread_factory) { super(diagnostics_addr,diagnostics_port,log,socket_factory,thread_factory); } public void start() throws IOException {super.start();} public void stop() {} public void destroy() {super.stop();} } @Test(enabled=false) public static void main(String[] args) throws Exception { MergeTest4 test=new MergeTest4(); test.setUp(); test.testMergeWithAsymetricViewsCoordIsolated(); test.tearDown(); } }