Index: src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java =================================================================== RCS file: src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java diff -N src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,65 @@ +package org.jgroups.blocks.scope; + +import java.util.Collection; + +import org.jgroups.Address; +import org.jgroups.Channel; +import org.jgroups.MembershipListener; +import org.jgroups.Message; +import org.jgroups.MessageListener; +import org.jgroups.blocks.GroupRequest; +import org.jgroups.blocks.RequestCorrelator; +import org.jgroups.blocks.RequestHandler; +import org.jgroups.blocks.RequestOptions; +import org.jgroups.blocks.RpcDispatcher; +import org.jgroups.blocks.RspFilter; + +public class ScopedRpcDispatcher extends RpcDispatcher { + + private final short scope; + private final ScopedUpHandler scopedUpHandler; + + public ScopedRpcDispatcher(short scope, ScopedUpHandler scopedUpHandler, Channel channel, MessageListener l, MembershipListener l2, Object serverObj) { + super(channel, l, l2, serverObj); + this.scope = scope; + this.scopedUpHandler = scopedUpHandler; + // Unfortunately, dispatcher is already started + // Set the scope of the request corrrelator + ((ScopedRequestCorrelator) this.corr).setScope(scope); + // Register our scope and up handler here since this could not be done in start() + this.scopedUpHandler.add(this.scope, this.getProtocolAdapter()); + } + + @Override + protected RequestCorrelator createRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) { + // We can't set the scope of the request correlator here + // since this method is called from start() triggered in the + // MessageDispatcher constructor, when this.scope is not yet defined + return new ScopedRequestCorrelator(transport, handler, localAddr); + } + + @Override + public void start() { + super.start(); + // This won't be the case when start() is called via MessageDispatcher constructor + if (scopedUpHandler != null) { + this.scopedUpHandler.add(this.scope, this.getProtocolAdapter()); + } + } + + @Override + public void stop() { + this.scopedUpHandler.remove(this.scope); + super.stop(); + } + + @Override + protected GroupRequest cast(Collection
dests, Message msg, RequestOptions options, boolean blockForResults) { + return super.cast(dests, msg, addRspFilter(options), blockForResults); + } + + private RequestOptions addRspFilter(RequestOptions options) { + RspFilter filter = options.getRspFilter(); + return options.setRspFilter((filter != null) ? new NoHandlerForScopeRspFilter(filter) : new NoHandlerForScopeRspFilter()); + } +} Index: src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java =================================================================== RCS file: src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java diff -N src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,65 @@ +package org.jgroups.blocks.scope; + +import java.util.Collection; + +import org.jgroups.Address; +import org.jgroups.Channel; +import org.jgroups.MembershipListener; +import org.jgroups.Message; +import org.jgroups.MessageListener; +import org.jgroups.blocks.GroupRequest; +import org.jgroups.blocks.MessageDispatcher; +import org.jgroups.blocks.RequestCorrelator; +import org.jgroups.blocks.RequestHandler; +import org.jgroups.blocks.RequestOptions; +import org.jgroups.blocks.RspFilter; + +public class ScopedMessageDispatcher extends MessageDispatcher { + + private final short scope; + private final ScopedUpHandler scopedUpHandler; + + public ScopedMessageDispatcher(short scope, ScopedUpHandler scopedUpHandler, Channel channel, MessageListener l, MembershipListener l2, RequestHandler handler) { + super(channel, l, l2, handler); + this.scope = scope; + this.scopedUpHandler = scopedUpHandler; + // Unfortunately, dispatcher is already started + // Set the scope of the request corrrelator + ((ScopedRequestCorrelator) this.corr).setScope(scope); + // Register our scope and up handler here since this could not be done in start() + this.scopedUpHandler.add(this.scope, this.getProtocolAdapter()); + } + + @Override + protected RequestCorrelator createRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) { + // We can't set the scope of the request correlator here + // since this method is called from start() triggered in the + // MessageDispatcher constructor, when this.scope is not yet defined + return new ScopedRequestCorrelator(transport, handler, localAddr); + } + + @Override + public void start() { + super.start(); + // This won't be the case when start() is called via MessageDispatcher constructor + if (scopedUpHandler != null) { + this.scopedUpHandler.add(this.scope, this.getProtocolAdapter()); + } + } + + @Override + public void stop() { + this.scopedUpHandler.remove(this.scope); + super.stop(); + } + + @Override + protected GroupRequest cast(Collection
dests, Message msg, RequestOptions options, boolean blockForResults) { + return super.cast(dests, msg, addRspFilter(options), blockForResults); + } + + private RequestOptions addRspFilter(RequestOptions options) { + RspFilter filter = options.getRspFilter(); + return options.setRspFilter((filter != null) ? new NoHandlerForScopeRspFilter(filter) : new NoHandlerForScopeRspFilter()); + } +} Index: src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java =================================================================== RCS file: src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java diff -N src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,60 @@ +package org.jgroups.blocks.scope; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.jgroups.Event; +import org.jgroups.Message; +import org.jgroups.UpHandler; +import org.jgroups.conf.ClassConfigurator; + +public class ScopedUpHandlerImpl implements ScopedUpHandler { + + static { + ClassConfigurator.add(ScopeHeader.ID, ScopeHeader.class); + } + + private final Map handlers=new ConcurrentHashMap(); + private final UpHandler defaultHandler; + + public ScopedUpHandlerImpl() { + this.defaultHandler = null; + } + + public ScopedUpHandlerImpl(UpHandler defaultHandler) { + this.defaultHandler = defaultHandler; + } + + @Override + public void add(short scope, UpHandler handler) { + this.handlers.put(scope, handler); + } + + @Override + public void remove(short scope) { + this.handlers.remove(scope); + } + + @Override + public Object up(Event evt) { + switch(evt.getType()) { + case Event.MSG: { + Message msg=(Message)evt.getArg(); + ScopeHeader hdr=(ScopeHeader)msg.getHeader(ScopeHeader.ID); + if (hdr != null) { + short scope = hdr.getScope(); + UpHandler handler=handlers.get(hdr.getScope()); + return (handler != null) ? handler.up(evt) : new NoHandlerForScope(scope); + } + break; + } + case Event.VIEW_CHANGE: { + for(UpHandler handler: handlers.values()) + handler.up(evt); + break; + } + } + + return (defaultHandler != null) ? defaultHandler.up(evt) : null; + } +} Index: src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java =================================================================== RCS file: src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java diff -N src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,37 @@ +package org.jgroups.blocks.scope; + +import java.util.Collection; + +import org.jgroups.Address; +import org.jgroups.Message; +import org.jgroups.blocks.RequestCorrelator; +import org.jgroups.blocks.RequestHandler; +import org.jgroups.blocks.RspCollector; +import org.jgroups.conf.ClassConfigurator; + +public class ScopedRequestCorrelator extends RequestCorrelator { + + private short scope; + + public ScopedRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) { + super(ClassConfigurator.getProtocolId(RequestCorrelator.class), transport, handler, localAddr); + } + + public void setScope(short scope) { + this.scope = scope; + } + + @Override + public void sendRequest(long id, Collection
dest_mbrs, Message msg, RspCollector coll, boolean use_anycasting) throws Exception { + ScopeHeader hdr=new ScopeHeader(scope); + msg.putHeader(ScopeHeader.ID, hdr); + super.sendRequest(id, dest_mbrs, msg, coll, use_anycasting); + } + + @Override + protected void prepareResponse(Message rsp) { + ScopeHeader hdr=new ScopeHeader(scope); + rsp.putHeader(ScopeHeader.ID, hdr); + super.prepareResponse(rsp); + } +} Index: src/org/jgroups/blocks/scope/ScopeHeader.java =================================================================== RCS file: src/org/jgroups/blocks/scope/ScopeHeader.java diff -N src/org/jgroups/blocks/scope/ScopeHeader.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/ScopeHeader.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,40 @@ +package org.jgroups.blocks.scope; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.jgroups.Global; +import org.jgroups.Header; + +public class ScopeHeader extends Header { + + public static final short ID=1500; + + private short scope; + + public ScopeHeader() { + } + public ScopeHeader(short scope) { + this.scope=scope; + } + + public short getScope() { + return scope; + } + + @Override + public int size() { + return Global.SHORT_SIZE; + } + + @Override + public void writeTo(DataOutputStream out) throws IOException { + out.writeShort(scope); + } + + @Override + public void readFrom(DataInputStream in) throws IOException { + scope=in.readShort(); + } +} Index: tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java =================================================================== RCS file: tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java diff -N tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,59 @@ +package org.jgroups.tests; + +import org.jgroups.*; +import org.jgroups.blocks.*; +import org.jgroups.blocks.scope.ScopedRpcDispatcher; +import org.jgroups.blocks.scope.ScopedUpHandler; +import org.jgroups.blocks.scope.ScopedUpHandlerImpl; +import org.jgroups.util.RspList; +import org.jgroups.util.Util; + +/** + * @author Bela Ban + * @version $Id$ + */ +public class ScopedRpcDispatcherTest { + + public static void main(String[] args) throws ChannelException { + + Channel c1=new JChannel("fast-local.xml"); + + ScopedUpHandler handler = new ScopedUpHandlerImpl(); + + RpcDispatcher disp1=new ScopedRpcDispatcher((short)100, handler, c1, null, null, new Server("foo1()")); + RpcDispatcher disp2=new ScopedRpcDispatcher((short)0, handler, c1, null, null, new Server("foo2()")); + RpcDispatcher disp3=new ScopedRpcDispatcher((short)150, handler, c1, null, null, new Server("foo3()")); + + c1.setUpHandler(handler); + + c1.connect("scopes"); + + while(!Thread.currentThread().isInterrupted()) { + Util.keyPress("enter: "); + + RspList rsps=disp1.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC); + System.out.println("rsps:\n" + rsps); + + rsps=disp2.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC); + System.out.println("rsps:\n" + rsps); + + rsps=disp3.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC); + System.out.println("rsps:\n" + rsps); + } + c1.disconnect(); + c1.close(); + } + + public static class Server { + final String name; + + public Server(String name) { + this.name=name; + } + + public int foo() { + System.out.println(name); + return (int)Util.random(100); + } + } +} Index: src/org/jgroups/blocks/scope/ScopedUpHandler.java =================================================================== RCS file: src/org/jgroups/blocks/scope/ScopedUpHandler.java diff -N src/org/jgroups/blocks/scope/ScopedUpHandler.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/ScopedUpHandler.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,9 @@ +package org.jgroups.blocks.scope; + +import org.jgroups.UpHandler; + +public interface ScopedUpHandler extends UpHandler { + + void add(short scope, UpHandler handler); + void remove(short scope); +} Index: src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java =================================================================== RCS file: src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java diff -N src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,27 @@ +package org.jgroups.blocks.scope; + +import org.jgroups.Address; +import org.jgroups.blocks.RspFilter; + +public class NoHandlerForScopeRspFilter implements RspFilter { + + private final RspFilter filter; + + public NoHandlerForScopeRspFilter() { + this.filter = null; + } + + public NoHandlerForScopeRspFilter(RspFilter filter) { + this.filter = filter; + } + + @Override + public boolean isAcceptable(Object response, Address sender) { + return !(response instanceof NoHandlerForScope) && ((filter == null) || filter.isAcceptable(response, sender)); + } + + @Override + public boolean needMoreResponses() { + return (filter == null) || filter.needMoreResponses(); + } +} Index: src/org/jgroups/blocks/scope/NoHandlerForScope.java =================================================================== RCS file: src/org/jgroups/blocks/scope/NoHandlerForScope.java diff -N src/org/jgroups/blocks/scope/NoHandlerForScope.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ src/org/jgroups/blocks/scope/NoHandlerForScope.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,17 @@ +package org.jgroups.blocks.scope; + +import java.io.Serializable; + +public class NoHandlerForScope implements Serializable { + private static final long serialVersionUID = -694135384125080323L; + + private short scope; + + public NoHandlerForScope(short scope) { + this.scope = scope; + } + + public short getScope() { + return scope; + } +}