package test; import static org.junit.Assert.*; import java.io.*; import java.util.concurrent.atomic.AtomicLong; import org.jgroups.*; import org.jgroups.conf.XmlConfigurator; import org.jgroups.util.Util; import org.junit.Test; public class JgroupsTest2 { private final org.jgroups.logging.Log log = org.jgroups.logging.LogFactory.getLog(this.getClass()); @Test public void testReconnect() throws Exception { ClusterSrv srv1 = new ClusterSrv(); ClusterSrv srv2 = new ClusterSrv(); for (int i = 1;; i++) { log.info("[1] srv1.start()"); srv1.start(); log.info("[2] srv2.start()"); srv2.start(); log.info("[3] " + srv1.getChannel().getName() + " srv1.sendMesage(msg1)"); srv1.sendMessage("msg1"); // Srv 2 gets msg log.info("[4] " + srv2.getChannel().getName() + " srv2.sendMesage(msg2)"); srv2.sendMessage("msg2"); // Srv 1 gets msg flushAndAssertCount(srv1, 1, srv2, 1); log.info("[5] " + srv1.getChannel().getName() + " srv1.stop()"); srv1.stop(); log.info("[6] srv1.start()"); srv1.start(); log.info("[7] " + srv2.getChannel().getName() + " srv2.sendMessage(msg3)"); srv2.sendMessage("msg3"); // Srv 1 gets msg flushAndAssertCount(srv1, 2, srv2, 1); log.info("[8] " + srv2.getChannel().getName() + " srv2.stop()"); srv2.stop(); log.info("[9] srv2.start()"); srv2.start(); log.info("[10] " + srv1.getChannel().getName() + " srv1.sendMessage(msg4)"); srv1.sendMessage("msg4"); // Srv 2 gets msg flushAndAssertCount(srv1, 2, srv2, 2); log.info("[11] " + srv1.getChannel().getName() + " srv1.stop()"); srv1.stop(); log.info("[12] " + srv2.getChannel().getName() + " srv2.stop()"); srv2.stop(); srv1.receiveCount.set(0); srv2.receiveCount.set(0); log.info("Round " + i + " done"); } } private void flushAndAssertCount(ClusterSrv srv1, int srv1Count, ClusterSrv srv2, int srv2Count) { log.info("flush(srv1)"); flush(srv1); log.info("flush(srv2)"); flush(srv2); assertCount(srv1, srv1Count, srv2, srv2Count); } private static void flush(ClusterSrv srv) { boolean ok = Util.startFlush(srv.getChannel()); srv.getChannel().stopFlush(); assertTrue(ok); } private void assertCount(ClusterSrv srv1, int srv1Count, ClusterSrv srv2, int srv2Count) { assertEquals(srv1Count, srv1.receiveCount.get()); assertEquals(srv2Count, srv2.receiveCount.get()); } private static class ClusterSrv { private final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(this.getClass()); private JChannel channel; public final AtomicLong receiveCount = new AtomicLong(); private static final AtomicLong channelCount = new AtomicLong(0); private synchronized void start() throws ChannelException, UnsupportedEncodingException, IOException { long start = System.currentTimeMillis(); InputStream is = getClass().getClassLoader().getResourceAsStream("flush-tcp.xml"); String host = "localhost"; System.setProperty("jgroups.bind_addr", host); System.setProperty("jgroups.tcpping.initial_hosts", host + "[7800], " + host + "[7801]" + host + "[7802]" + host + "[7803]" + host + "[7804]"); channel = new JChannel(XmlConfigurator.getInstance(is)); channel.setOpt(Channel.LOCAL, Boolean.FALSE); channel.setReceiver(new ReceiverImpl()); channel.setName("channel-" + Long.toString(channelCount.incrementAndGet())); channel.connect("testClust"); log.info("Start done in " + (System.currentTimeMillis() - start) + " ms"); } public synchronized JChannel getChannel() { return channel; } private synchronized void stop() { if (channel != null) { channel.close(); } } private class ReceiverImpl extends ReceiverAdapter { @Override public void receive(Message msg) { receiveCount.incrementAndGet(); byte[] byteData = msg.getBuffer(); ObjectInputStream ois = null; try { ois = new ObjectInputStream(new ByteArrayInputStream(byteData)); Object data = ois.readObject(); log.debug("Received msg: " + data.toString()); } catch (Exception e) { log.error("Receive failed", e); } } @Override public void viewAccepted(View new_view) { log.info("New view: " + new_view.toString()); } } private synchronized void sendMessage(Serializable obj) throws Exception { ByteArrayOutputStream os = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(os); oos.writeObject(obj); oos.close(); byte[] buf = os.toByteArray(); Message msg = new Message(null, null, buf); log.debug("Sending message: " + msg); if (channel == null || !channel.isConnected()) { throw new IllegalStateException("Channel was disconnected when trying to send message"); } channel.send(msg); log.debug("Sent message " + msg); } } }