Index: GossipRouter.java =================================================================== RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/stack/GossipRouter.java,v retrieving revision 1.23 diff -r1.23 GossipRouter.java 5,10c5,6 < import org.apache.commons.logging.Log; < import org.apache.commons.logging.LogFactory; < import org.jgroups.Address; < import org.jgroups.conf.ClassConfigurator; < import org.jgroups.util.Util; < --- > import java.io.ByteArrayInputStream; > import java.io.ByteArrayOutputStream; 15d10 < import java.net.ServerSocket; 17c12,26 < import java.util.*; --- > import java.util.HashMap; > import java.util.Iterator; > import java.util.LinkedList; > import java.util.List; > import java.util.Map; > import java.util.Timer; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.jgroups.Address; > import org.jgroups.blocks.BasicConnectionTable; > import org.jgroups.blocks.ConnectionTable; > import org.jgroups.blocks.BasicConnectionTable.Receiver; > import org.jgroups.conf.ClassConfigurator; > import org.jgroups.util.Util; 77c86 < private ServerSocket srvSock=null; --- > // private ServerSocket srvSock=null; 90a100 > private BasicConnectionTable connectionTable; 169c179 < return srvSock != null; --- > return connectionTable != null; 179a190,196 > public BasicConnectionTable getConnectionTable() { > return connectionTable; > } > > public void setConnectionTable(BasicConnectionTable connectionTable) { > this.connectionTable = connectionTable; > } 181c198 < public static String type2String(int type) { --- > public static String type2String(int type) { 224c241 < if(srvSock != null) { --- > if(connectionTable != null) { 230c247 < srvSock=new ServerSocket(port, 50, bindAddress); --- > // srvSock=new ServerSocket(port, 50, bindAddress); 233c250 < srvSock=new ServerSocket(port, 50); --- > // srvSock=new ServerSocket(port, 50); 237,244c254,261 < < // start the main server thread < new Thread(new Runnable() { < public void run() { < mainLoop(); < cleanup(); < } < }, "GossipRouter").start(); --- > // connectionTable.addConnectionListener(this); > // // start the main server thread > // new Thread(new Runnable() { > // public void run() { > // mainLoop(); > // cleanup(); > // } > // }, "GossipRouter").start(); 248,253c265,270 < timer=new Timer(true); < timer.schedule(new TimerTask() { < public void run() { < sweep(); < } < }, expiryTime, expiryTime); --- > // timer=new Timer(true); > // timer.schedule(new TimerTask() { > // public void run() { > // sweep(); > // } > // }, expiryTime, expiryTime); 256c273,275 < /** --- > > > /** 263c282 < if(srvSock == null) { --- > if(connectionTable == null) { 270c289 < try { --- > /*try { 277c296 < srvSock=null; --- > srvSock=null;*/ 331c350 < /** --- > /** 333c352 < */ --- > * 346,487c365 < GossipData req; < String group; < < while(up && srvSock != null) { < try { < sock=srvSock.accept(); < sock.setSoLinger(true, 500); < input=new DataInputStream(sock.getInputStream()); < // if(log.isTraceEnabled()) < // log.trace("accepted connection from " + sock); < < req=new GossipData(); < req.readFrom(input); < < switch(req.getType()) { < case GossipRouter.REGISTER: < mbr=req.getAddress(); < group=req.getGroup(); < if(log.isTraceEnabled()) < log.trace("REGISTER(" + group + ", " + mbr + ")"); < if(group == null || mbr == null) { < if(log.isErrorEnabled()) log.error("group or member is null, cannot register member"); < } < else < addGossipEntry(group, mbr, new AddressEntry(mbr)); < Util.close(input); < Util.close(sock); < break; < < case GossipRouter.UNREGISTER: < mbr=req.getAddress(); < group=req.getGroup(); < if(log.isTraceEnabled()) < log.trace("UNREGISTER(" + group + ", " + mbr + ")"); < if(group == null || mbr == null) { < if(log.isErrorEnabled()) log.error("group or member is null, cannot unregister member"); < } < else < removeGossipEntry(group, mbr); < Util.close(input); < Util.close(output); < Util.close(sock); < break; < < case GossipRouter.GOSSIP_GET: < group=req.getGroup(); < List
mbrs=null; < Map map; < synchronized(routingTable) { < map=routingTable.get(group); < if(map != null) { < mbrs=new LinkedList
(map.keySet()); < } < } < < if(log.isTraceEnabled()) < log.trace("GOSSIP_GET(" + group + ") --> " + mbrs); < output=new DataOutputStream(sock.getOutputStream()); < GossipData rsp=new GossipData(GossipRouter.GET_RSP, group, null, mbrs); < rsp.writeTo(output); < Util.close(input); < Util.close(output); < Util.close(sock); < break; < < case GossipRouter.ROUTER_GET: < group=req.getGroup(); < output=new DataOutputStream(sock.getOutputStream()); < < List
ret=null; < synchronized(routingTable) { < map=routingTable.get(group); < if(map != null) { < ret=new LinkedList
(map.keySet()); < } < else < ret=new LinkedList
(); < } < if(log.isTraceEnabled()) < log.trace("ROUTER_GET(" + group + ") --> " + ret); < rsp=new GossipData(GossipRouter.GET_RSP, group, null, ret); < rsp.writeTo(output); < Util.close(input); < Util.close(output); < Util.close(sock); < break; < < case GossipRouter.DUMP: < output=new DataOutputStream(sock.getOutputStream()); < output.writeUTF(dumpRoutingTable()); < Util.close(input); < Util.close(output); < Util.close(sock); < break; < < case GossipRouter.CONNECT: < output=new DataOutputStream(sock.getOutputStream()); < peer_addr=new IpAddress(sock.getInetAddress(), sock.getPort()); < output=new DataOutputStream(sock.getOutputStream()); < logical_addr=req.getAddress(); < String group_name=req.getGroup(); < < if(log.isTraceEnabled()) < log.trace("CONNECT(" + group_name + ", " + logical_addr + ")"); < SocketThread st=new SocketThread(sock, input, group_name, logical_addr); < addEntry(group_name, logical_addr, new AddressEntry(logical_addr, peer_addr, sock, st, output)); < st.start(); < break; < < case GossipRouter.DISCONNECT: < Address addr=req.getAddress(); < group_name=req.getGroup(); < removeEntry(group_name, addr); < if(log.isTraceEnabled()) < log.trace("DISCONNECT(" + group_name + ", " + addr + ")"); < Util.close(input); < Util.close(output); < Util.close(sock); < break; < < case GossipRouter.SHUTDOWN: < if(log.isInfoEnabled()) log.info("router shutting down"); < Util.close(input); < Util.close(output); < Util.close(sock); < up=false; < break; < default: < if(log.isWarnEnabled()) < log.warn("received unkown gossip request (gossip=" + req + ')'); < break; < } < } < catch(Exception e) { < if(up) < if(log.isErrorEnabled()) log.error("failure handling a client request", e); < Util.close(input); < Util.close(output); < Util.close(sock); < } < } < } --- > }*/ 492c370 < */ --- > * 509c387 < } --- > }*/ 515c393,394 < Socket s=null; --- > connectionTable.stop(); > /*Socket s=null; 529c408 < } --- > }*/ 538c417 < */ --- > * 571c450 < } --- > }*/ 577,582d455 < //if(log.isTraceEnabled()) { < // int len=msg != null? msg.length : 0; < //log.trace("routing request from " + sender + " for " + dest_group + " to " + < // (dest == null? "ALL" : dest.toString()) + ", " + len + " bytes"); < //} < 594,602c467,475 < if(ae == null) { < if(log.isTraceEnabled()) < log.trace("cannot find " + dest + " in the routing table, \nrouting table=\n" + dumpRoutingTable()); < return; < } < if(ae.output == null) { < if(log.isErrorEnabled()) log.error(dest + " is associated with a null output stream"); < return; < } --- > // if(ae == null) { > // if(log.isTraceEnabled()) > // log.trace("cannot find " + dest + " in the routing table, \nrouting table=\n" + dumpRoutingTable()); > // return; > // } > // if(ae.output == null) { > // if(log.isErrorEnabled()) log.error(dest + " is associated with a null output stream"); > // return; > // } 604c477 < sendToMember(dest, ae.output, msg, sender); --- > sendToMember(dest, null, msg, sender); 715c588,589 < if(dos != null) { --- > // if(dos != null) > { 718c592 < sendToMember(null, dos, msg, sender); --- > sendToMember(entry.logical_addr, dos, msg, sender); 735,736c609,610 < if(out == null) < return; --- > // if(out == null) > // return; 741,742c615,623 < < synchronized(out) { --- > > try { > log.trace("sending mesage ->" + msg.length + "to dest ->" + dest); > connectionTable.send(dest, msg, 0, msg.length); > } > catch(Exception send_ex) { > if(log.isWarnEnabled()) log.warn("cannot send to " + dest + ": " + send_ex.getMessage()); > } > /*synchronized(out) { 746c627 < } --- > }*/ 819c700 < class SocketThread extends Thread { --- > class SocketThread implements Receiver { 821c702 < Socket sock=null; --- > // Socket sock=null; 826c707,710 < --- > public SocketThread(){ > > } > 828,829c712 < super(Util.getGlobalThreadGroup(), "SocketThread " + (threadCounter++)); < this.sock=sock; --- > // super(Util.getGlobalThreadGroup(), "SocketThread " + (threadCounter++)); 835,837c718,837 < void closeSocket() { < Util.close(input); < Util.close(sock); --- > > public void receive(Address sender, byte[] data, int offset, int length) { > GossipData req; > String group; > ByteArrayOutputStream bout = new ByteArrayOutputStream(); > > try { > req=new GossipData(); > req.readFrom(new DataInputStream(new ByteArrayInputStream(data))); > Address peer_addr=null, mbr=null, logical_addr; > switch(req.getType()) { > case GossipRouter.REGISTER: > mbr=req.getAddress(); > group=req.getGroup(); > if(log.isTraceEnabled()) > log.trace("REGISTER(" + group + ", " + mbr + ")"); > if(group == null || mbr == null) { > if(log.isErrorEnabled()) log.error("group or member is null, cannot register member"); > } > else > addGossipEntry(group, mbr, new AddressEntry(mbr)); > break; > > case GossipRouter.UNREGISTER: > mbr=req.getAddress(); > group=req.getGroup(); > if(log.isTraceEnabled()) > log.trace("UNREGISTER(" + group + ", " + mbr + ")"); > if(group == null || mbr == null) { > if(log.isErrorEnabled()) log.error("group or member is null, cannot unregister member"); > } > else > removeGossipEntry(group, mbr); > break; > > case GossipRouter.GOSSIP_GET: > group=req.getGroup(); > List
mbrs=null; > Map map; > synchronized(routingTable) { > map=routingTable.get(group); > if(map != null) { > mbrs=new LinkedList
(map.keySet()); > } > } > > if(log.isTraceEnabled()) > log.trace("GOSSIP_GET(" + group + ") --> " + mbrs); > GossipData rsp=new GossipData(GossipRouter.GET_RSP, group, null, mbrs); > rsp.writeTo(new DataOutputStream(bout)); > data = bout.toByteArray(); > route(sender, group, data, connectionTable.getLocalAddress()); // TODO check the sender is t correct? > break; > > case GossipRouter.ROUTER_GET: > group=req.getGroup(); > List ret=null; > synchronized(routingTable) { > map=(Map)routingTable.get(group); > if(map != null) { > ret=new LinkedList(map.keySet()); > } > else > ret=new LinkedList(); > } > if(log.isTraceEnabled()) > log.trace("ROUTER_GET(" + group + ") --> " + ret); > rsp=new GossipData(GossipRouter.GET_RSP, group, null, ret); > rsp.writeTo(new DataOutputStream(bout)); > data = bout.toByteArray(); > route(sender, group, data, connectionTable.getLocalAddress()); // TODO check the sender is t correct? > break; > > case GossipRouter.DUMP: > data = dumpRoutingTable().getBytes(); > connectionTable.send(req.getAddress(), data, 0, data.length); > break; > > case GossipRouter.CONNECT: > // peer_addr=new IpAddress(sock.getInetAddress(), sock.getPort()); > // not sure both are same temporary fix > peer_addr = mbr; > logical_addr=req.getAddress(); > String group_name=req.getGroup(); > > if(log.isTraceEnabled()) > log.trace("CONNECT(" + group_name + ", " + logical_addr + ")"); > // connection table nio based implementation should go here > // SocketThread st=new SocketThread(sock, input, group_name, logical_addr); > // addEntry(group_name, logical_addr, new AddressEntry(logical_addr, peer_addr, sock, st, output)); > break; > > case GossipRouter.DISCONNECT: > Address addr=req.getAddress(); > group_name=req.getGroup(); > removeEntry(group_name, addr); > connectionTable.remove(addr); > if(log.isTraceEnabled()) > log.trace("DISCONNECT(" + group_name + ", " + addr + ")"); > break; > > case GossipRouter.SHUTDOWN: > if(log.isInfoEnabled()) log.info("router shutting down"); > connectionTable.stop(); > up=false; > break; > default: > if(log.isWarnEnabled()) > log.warn("received unkown gossip request (gossip=" + req + ')'); > break; > } > } > catch(Exception e) { > if(up) > if(log.isErrorEnabled()) log.error("failure handling a client request", e); > } > } > > > void closeSocket() { 845c845 < public void run() { --- > /* public void run() { 887c887 < } --- > }*/ 899c899 < String bind_addr=null; --- > String bind_addr="localhost"; 930,931c930,940 < router=new GossipRouter(port, bind_addr, expiry, timeout, routingTimeout); < router.start(); --- > router=new GossipRouter();//port, bind_addr, expiry, timeout, routingTimeout); > if(bind_addr != null) { > InetAddress bindAddress=InetAddress.getByName(bind_addr); > BasicConnectionTable conTable = new ConnectionTable(bindAddress,port); > conTable.setReceiver(router.new SocketThread()); > router.setConnectionTable(conTable); > } > else { > router.setConnectionTable(new ConnectionTable(port)); > } > // router.start(); creating ConnectionTable will automatically starts