package org.jgroups.tests; import org.jgroups.*; import java.io.*; import java.util.concurrent.atomic.AtomicLong; import java.util.List; import java.util.ArrayList; public class JGroupsTest extends ChannelTestBase { private static final String configStr="flush-udp.xml"; public void testReconnect() throws Exception { ClusterSrv srv1=new ClusterSrv(); ClusterSrv srv2=new ClusterSrv(); for(int i=1; ; i++) { srv1.start(); srv2.start(); // Both get "view" message srv1.sendMessage("msg1"); // Srv 2 gets msg srv2.sendMessage("msg2"); // Srv 1 gets msg srv1.flush(); assertCount(srv1, 1, srv2, 1); srv1.stopFlush(); srv1.stop(); srv1.start(); // Both get "view" message srv2.sendMessage("msg3"); // Srv 1 gets msg srv2.flush(); assertCount(srv1, 2, srv2, 1); srv2.stopFlush(); srv2.stop(); srv2.start(); // Both get "view" message srv2.sendMessage("msg4"); srv2.flush(); srv2.stopFlush(); assertCount(srv1, 3, srv2, 1); srv1.stop(); srv2.stop(); srv1.receiveCount.set(0); srv2.receiveCount.set(0); System.out.println("***** Round " + i + " done *****"); } } private void assertCount(ClusterSrv srv1, int srv1Count, ClusterSrv srv2, int srv2Count) throws InterruptedException { long start=System.currentTimeMillis(); for(int i=0; i < 1000; i++) { if(srv1.receiveCount.get() == srv1Count && srv2.receiveCount.get() == srv2Count) { break; } Thread.sleep(10L); } assertEquals(srv1Count, srv1.receiveCount.get()); assertEquals(srv2Count, srv2.receiveCount.get()); log.info("assert OK in " + (System.currentTimeMillis() - start) + "ms"); } private class ClusterSrv { private JChannel channel; public final AtomicLong receiveCount=new AtomicLong(); public ReceiverImpl receiver; private synchronized void start() throws ChannelException, IOException { long start=System.currentTimeMillis(); channel=new JChannel(configStr); channel.setOpt(Channel.LOCAL, Boolean.FALSE); receiver=new ReceiverImpl(); channel.setReceiver(receiver); channel.connect("testClust"); notifyAll(); log.info("Start done in " + (System.currentTimeMillis() - start) + " ms"); } public void flush() { channel.startFlush(false); } public void stopFlush() { channel.stopFlush(); } private synchronized void stop() { channel.close(); } private class ReceiverImpl extends ReceiverAdapter { final List msgs=new ArrayList(); public List getMsgs() { return msgs; } @Override public void receive(Message msg) { try { Object data=msg.getObject(); msgs.add(data); receiveCount.incrementAndGet(); log.debug("Received msg: " + data); } catch(Exception e) { log.error("Receive failed", e); } } @Override public void viewAccepted(View new_view) { } } private synchronized void sendMessage(Serializable obj) throws Exception { if(!channel.isConnected()) { log.warn("Channel disconnected in send, discarding msg"); return; } Message msg=new Message(null, null, obj); log.debug("Sending message: " + msg); channel.send(msg); log.debug("Sent message: " + msg); } } }