package org.jgroups.protocols;
import org.jgroups.*;
//import org.jgroups.stack.AckMcastSenderWindow;
//import org.jgroups.stack.AckReceiverWindow;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.StaticInterval;
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;
import java.io.*;
import java.util.*;
/**
* Simple Unreliable Multicast protocol. A no-acknowledgment protocol for unreliable delivery of
* multicast messages, which does not need any group membership service.
* Basically works as follows:
*
* - Sender S sends multicast message M
* - When member P receives M, it checks whether P is in its
* membership list. If not, P will be added.
*
* Advantage of this protocol: no group membership necessary, fast, low-traffic.
* @author Bela Ban Aug 2002
* @version $Id: SUMP.java,v 1.00 2008/12/19 15:38:38 samh Exp $
*
convert SMACK to SUMP
*/
public class SUMP extends Protocol {
long[] timeout=new long[]{1000,2000,3000}; // retransmit timeouts (for AckMcastSenderWindow)
int max_xmits=10; // max retransmissions (if still no ack, member will be removed)
final Set members=new LinkedHashSet(); // contains Addresses
final Map cfmap=new HashMap(); // to store configuration information
Address local_addr=null; // my own address
long seqno=1; // seqno for msgs sent by this sender
long vid=1; // for the fake view changes
boolean print_local_addr=true;
static final String name="SUMP";
public SUMP() {
}
public String getName() {
return name;
}
public void init() throws Exception {
cfmap.put("state_transfer", Boolean.TRUE);
cfmap.put("protocol_class", getClass().getName());
}
public boolean setProperties(Properties props) {
String str;
long[] tmp;
super.setProperties(props);
str=props.getProperty("print_local_addr");
if(str != null) {
print_local_addr=Boolean.valueOf(str).booleanValue();
props.remove("print_local_addr");
}
str=props.getProperty("timeout");
if(str != null) {
tmp=Util.parseCommaDelimitedLongs(str);
props.remove("timeout");
if(tmp != null && tmp.length > 0)
timeout=tmp;
}
str=props.getProperty("max_xmits");
if(str != null) {
max_xmits=Integer.parseInt(str);
props.remove("max_xmits");
}
if(!props.isEmpty()) {
log.error("the following properties are not recognized: " + props);
return false;
}
return true;
}
public void start() throws Exception {
up_prot.up(new Event(Event.CONFIG, cfmap));
}
public void stop() {
super.stop();
}
public Object up(Event evt) {
Address sender;
switch(evt.getType()) {
case Event.SET_LOCAL_ADDRESS:
local_addr=(Address)evt.getArg();
addMember(local_addr);
if(print_local_addr) {
System.out.println("\n-------------------------------------------------------\n" +
"GMS: address is " + local_addr +
"\n-------------------------------------------------------");
}
break;
case Event.SUSPECT:
if(log.isInfoEnabled()) log.info("removing suspected member " + evt.getArg());
removeMember((Address)evt.getArg());
break;
case Event.MSG:
Message msg=(Message)evt.getArg(), tmp_msg;
if(msg == null) break;
sender=msg.getSrc();
SumpHeader hdr=(SumpHeader)msg.getHeader(name);
if(hdr == null) // is probably a unicast message
break;
switch(hdr.type) {
case SumpHeader.MCAST: // send an ack, then pass up (if not already received)
if(log.isTraceEnabled())
log.trace("received #" + hdr.seqno + " from " + sender);
if (!containsMember(sender)) {
addMember(sender);
Message ack_msg=new Message(sender);
ack_msg.putHeader(name, new SumpHeader(SumpHeader.ACK, hdr.seqno));
down_prot.down(new Event(Event.MSG, ack_msg));
}
up_prot.up(new Event(Event.MSG, msg));
return null;
case SumpHeader.ACK:
addMember(msg.getSrc());
if(log.isTraceEnabled())
log.trace("received ack for #" + hdr.seqno + " from " + msg.getSrc());
return null;
case SumpHeader.JOIN_ANNOUNCEMENT:
if(log.isInfoEnabled()) log.info("received join announcement by " + msg.getSrc());
if(!containsMember(sender)) {
addMember(sender);
Message join_rsp=new Message(sender);
join_rsp.putHeader(name, new SumpHeader(SumpHeader.JOIN_ANNOUNCEMENT, -1));
down_prot.down(new Event(Event.ENABLE_UNICASTS_TO, sender));
down_prot.down(new Event(Event.MSG, join_rsp));
}
return null;
case SumpHeader.LEAVE_ANNOUNCEMENT:
if(log.isInfoEnabled()) log.info("received leave announcement by " + msg.getSrc());
removeMember(sender);
return null;
default:
if(log.isWarnEnabled()) log.warn("detected SumpHeader with invalid type: " + hdr);
break;
}
break;
}
return up_prot.up(evt);
}
public Object down(Event evt) {
Message leave_msg;
switch(evt.getType()) {
case Event.DISCONNECT:
leave_msg=new Message();
leave_msg.putHeader(name, new SumpHeader(SumpHeader.LEAVE_ANNOUNCEMENT, -1));
down_prot.down(new Event(Event.MSG, leave_msg));
break;
case Event.CONNECT:
Object ret=down_prot.down(evt);
// send join announcement
Message join_msg=new Message();
join_msg.putHeader(name, new SumpHeader(SumpHeader.JOIN_ANNOUNCEMENT, -1));
down_prot.down(new Event(Event.MSG, join_msg));
return ret;
case Event.MSG:
// add a header with the current sequence number and increment seqno
Message msg=(Message)evt.getArg();
if(msg == null) break;
if(msg.getDest() == null || msg.getDest().isMulticastAddress()) {
msg.putHeader(name, new SumpHeader(SumpHeader.MCAST, seqno));
if(log.isTraceEnabled()) log.trace("sending mcast #" + seqno);
seqno++;
}
break;
case Event.GET_STATE:
StateTransferInfo info=(StateTransferInfo)evt.getArg();
up_prot.up(new Event(Event.GET_STATE_OK, new StateTransferInfo()));
}
return down_prot.down(evt);
}
public static class SumpHeader extends Header implements Streamable {
public static final byte MCAST=1;
public static final byte ACK=2;
public static final byte JOIN_ANNOUNCEMENT=3;
public static final byte LEAVE_ANNOUNCEMENT=4;
byte type=0;
long seqno=-1;
private static final long serialVersionUID=7605481696520929774L;
public SumpHeader() {
}
public SumpHeader(byte type, long seqno) {
this.type=type;
this.seqno=seqno;
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeByte(type);
out.writeLong(seqno);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
type=in.readByte();
seqno=in.readLong();
}
public int size() {
return Global.LONG_SIZE + Global.BYTE_SIZE;
}
public void writeTo(DataOutputStream out) throws IOException {
out.writeByte(type);
out.writeLong(seqno);
}
public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
type=in.readByte();
seqno=in.readLong();
}
public String toString() {
switch(type) {
case MCAST:
return "MCAST";
case ACK:
return "ACK";
case JOIN_ANNOUNCEMENT:
return "JOIN_ANNOUNCEMENT";
case LEAVE_ANNOUNCEMENT:
return "LEAVE_ANNOUNCEMENT";
default:
return "";
}
}
}
/* ------------------------------------- Private methods --------------------------------------- */
void addMember(Address mbr) {
Vector tmp=null;
synchronized(members) {
if(members.add(mbr)) {
tmp=new Vector(members);
}
}
if(tmp != null) {
if(log.isTraceEnabled())
log.trace("added " + mbr + ", members=" + tmp);
View new_view=new View(new ViewId(local_addr, vid++), tmp);
up_prot.up(new Event(Event.VIEW_CHANGE, new_view));
down_prot.down(new Event(Event.VIEW_CHANGE, new_view));
}
}
void removeMember(Address mbr) {
Vector tmp=null;
synchronized(members) {
if(members.remove(mbr))
tmp=new Vector(members);
}
if(tmp != null) {
if(log.isTraceEnabled())
log.trace("removed " + mbr + ", members=" + tmp);
View new_view=new View(new ViewId(local_addr, vid++), tmp);
up_prot.up(new Event(Event.VIEW_CHANGE, new_view));
down_prot.down(new Event(Event.VIEW_CHANGE, new_view));
}
}
boolean containsMember(Address mbr) {
synchronized(members) {
return members.contains(mbr);
}
}
/* --------------------------------- End of Private methods ------------------------------------ */
}