package org.jgroups.protocols; import org.jgroups.*; //import org.jgroups.stack.AckMcastSenderWindow; //import org.jgroups.stack.AckReceiverWindow; import org.jgroups.stack.StateTransferInfo; import org.jgroups.stack.Protocol; import org.jgroups.stack.StaticInterval; import org.jgroups.util.Streamable; import org.jgroups.util.Util; import java.io.*; import java.util.*; /** * Simple Unreliable Multicast protocol. A no-acknowledgment protocol for unreliable delivery of * multicast messages, which does not need any group membership service. * Basically works as follows: * * Advantage of this protocol: no group membership necessary, fast, low-traffic. * @author Bela Ban Aug 2002 * @version $Id: SUMP.java,v 1.00 2008/12/19 15:38:38 samh Exp $ *
convert SMACK to SUMP */ public class SUMP extends Protocol { long[] timeout=new long[]{1000,2000,3000}; // retransmit timeouts (for AckMcastSenderWindow) int max_xmits=10; // max retransmissions (if still no ack, member will be removed) final Set
members=new LinkedHashSet
(); // contains Addresses final Map cfmap=new HashMap(); // to store configuration information Address local_addr=null; // my own address long seqno=1; // seqno for msgs sent by this sender long vid=1; // for the fake view changes boolean print_local_addr=true; static final String name="SUMP"; public SUMP() { } public String getName() { return name; } public void init() throws Exception { cfmap.put("state_transfer", Boolean.TRUE); cfmap.put("protocol_class", getClass().getName()); } public boolean setProperties(Properties props) { String str; long[] tmp; super.setProperties(props); str=props.getProperty("print_local_addr"); if(str != null) { print_local_addr=Boolean.valueOf(str).booleanValue(); props.remove("print_local_addr"); } str=props.getProperty("timeout"); if(str != null) { tmp=Util.parseCommaDelimitedLongs(str); props.remove("timeout"); if(tmp != null && tmp.length > 0) timeout=tmp; } str=props.getProperty("max_xmits"); if(str != null) { max_xmits=Integer.parseInt(str); props.remove("max_xmits"); } if(!props.isEmpty()) { log.error("the following properties are not recognized: " + props); return false; } return true; } public void start() throws Exception { up_prot.up(new Event(Event.CONFIG, cfmap)); } public void stop() { super.stop(); } public Object up(Event evt) { Address sender; switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); addMember(local_addr); if(print_local_addr) { System.out.println("\n-------------------------------------------------------\n" + "GMS: address is " + local_addr + "\n-------------------------------------------------------"); } break; case Event.SUSPECT: if(log.isInfoEnabled()) log.info("removing suspected member " + evt.getArg()); removeMember((Address)evt.getArg()); break; case Event.MSG: Message msg=(Message)evt.getArg(), tmp_msg; if(msg == null) break; sender=msg.getSrc(); SumpHeader hdr=(SumpHeader)msg.getHeader(name); if(hdr == null) // is probably a unicast message break; switch(hdr.type) { case SumpHeader.MCAST: // send an ack, then pass up (if not already received) if(log.isTraceEnabled()) log.trace("received #" + hdr.seqno + " from " + sender); if (!containsMember(sender)) { addMember(sender); Message ack_msg=new Message(sender); ack_msg.putHeader(name, new SumpHeader(SumpHeader.ACK, hdr.seqno)); down_prot.down(new Event(Event.MSG, ack_msg)); } up_prot.up(new Event(Event.MSG, msg)); return null; case SumpHeader.ACK: addMember(msg.getSrc()); if(log.isTraceEnabled()) log.trace("received ack for #" + hdr.seqno + " from " + msg.getSrc()); return null; case SumpHeader.JOIN_ANNOUNCEMENT: if(log.isInfoEnabled()) log.info("received join announcement by " + msg.getSrc()); if(!containsMember(sender)) { addMember(sender); Message join_rsp=new Message(sender); join_rsp.putHeader(name, new SumpHeader(SumpHeader.JOIN_ANNOUNCEMENT, -1)); down_prot.down(new Event(Event.ENABLE_UNICASTS_TO, sender)); down_prot.down(new Event(Event.MSG, join_rsp)); } return null; case SumpHeader.LEAVE_ANNOUNCEMENT: if(log.isInfoEnabled()) log.info("received leave announcement by " + msg.getSrc()); removeMember(sender); return null; default: if(log.isWarnEnabled()) log.warn("detected SumpHeader with invalid type: " + hdr); break; } break; } return up_prot.up(evt); } public Object down(Event evt) { Message leave_msg; switch(evt.getType()) { case Event.DISCONNECT: leave_msg=new Message(); leave_msg.putHeader(name, new SumpHeader(SumpHeader.LEAVE_ANNOUNCEMENT, -1)); down_prot.down(new Event(Event.MSG, leave_msg)); break; case Event.CONNECT: Object ret=down_prot.down(evt); // send join announcement Message join_msg=new Message(); join_msg.putHeader(name, new SumpHeader(SumpHeader.JOIN_ANNOUNCEMENT, -1)); down_prot.down(new Event(Event.MSG, join_msg)); return ret; case Event.MSG: // add a header with the current sequence number and increment seqno Message msg=(Message)evt.getArg(); if(msg == null) break; if(msg.getDest() == null || msg.getDest().isMulticastAddress()) { msg.putHeader(name, new SumpHeader(SumpHeader.MCAST, seqno)); if(log.isTraceEnabled()) log.trace("sending mcast #" + seqno); seqno++; } break; case Event.GET_STATE: StateTransferInfo info=(StateTransferInfo)evt.getArg(); up_prot.up(new Event(Event.GET_STATE_OK, new StateTransferInfo())); } return down_prot.down(evt); } public static class SumpHeader extends Header implements Streamable { public static final byte MCAST=1; public static final byte ACK=2; public static final byte JOIN_ANNOUNCEMENT=3; public static final byte LEAVE_ANNOUNCEMENT=4; byte type=0; long seqno=-1; private static final long serialVersionUID=7605481696520929774L; public SumpHeader() { } public SumpHeader(byte type, long seqno) { this.type=type; this.seqno=seqno; } public void writeExternal(ObjectOutput out) throws IOException { out.writeByte(type); out.writeLong(seqno); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readByte(); seqno=in.readLong(); } public int size() { return Global.LONG_SIZE + Global.BYTE_SIZE; } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); out.writeLong(seqno); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readByte(); seqno=in.readLong(); } public String toString() { switch(type) { case MCAST: return "MCAST"; case ACK: return "ACK"; case JOIN_ANNOUNCEMENT: return "JOIN_ANNOUNCEMENT"; case LEAVE_ANNOUNCEMENT: return "LEAVE_ANNOUNCEMENT"; default: return ""; } } } /* ------------------------------------- Private methods --------------------------------------- */ void addMember(Address mbr) { Vector
tmp=null; synchronized(members) { if(members.add(mbr)) { tmp=new Vector
(members); } } if(tmp != null) { if(log.isTraceEnabled()) log.trace("added " + mbr + ", members=" + tmp); View new_view=new View(new ViewId(local_addr, vid++), tmp); up_prot.up(new Event(Event.VIEW_CHANGE, new_view)); down_prot.down(new Event(Event.VIEW_CHANGE, new_view)); } } void removeMember(Address mbr) { Vector
tmp=null; synchronized(members) { if(members.remove(mbr)) tmp=new Vector
(members); } if(tmp != null) { if(log.isTraceEnabled()) log.trace("removed " + mbr + ", members=" + tmp); View new_view=new View(new ViewId(local_addr, vid++), tmp); up_prot.up(new Event(Event.VIEW_CHANGE, new_view)); down_prot.down(new Event(Event.VIEW_CHANGE, new_view)); } } boolean containsMember(Address mbr) { synchronized(members) { return members.contains(mbr); } } /* --------------------------------- End of Private methods ------------------------------------ */ }