Index: src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java
diff -N src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/ScopedRpcDispatcher.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,65 @@
+package org.jgroups.blocks.scope;
+
+import java.util.Collection;
+
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RequestCorrelator;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.blocks.RspFilter;
+
+public class ScopedRpcDispatcher extends RpcDispatcher {
+
+ private final short scope;
+ private final ScopedUpHandler scopedUpHandler;
+
+ public ScopedRpcDispatcher(short scope, ScopedUpHandler scopedUpHandler, Channel channel, MessageListener l, MembershipListener l2, Object serverObj) {
+ super(channel, l, l2, serverObj);
+ this.scope = scope;
+ this.scopedUpHandler = scopedUpHandler;
+ // Unfortunately, dispatcher is already started
+ // Set the scope of the request corrrelator
+ ((ScopedRequestCorrelator) this.corr).setScope(scope);
+ // Register our scope and up handler here since this could not be done in start()
+ this.scopedUpHandler.add(this.scope, this.getProtocolAdapter());
+ }
+
+ @Override
+ protected RequestCorrelator createRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) {
+ // We can't set the scope of the request correlator here
+ // since this method is called from start() triggered in the
+ // MessageDispatcher constructor, when this.scope is not yet defined
+ return new ScopedRequestCorrelator(transport, handler, localAddr);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ // This won't be the case when start() is called via MessageDispatcher constructor
+ if (scopedUpHandler != null) {
+ this.scopedUpHandler.add(this.scope, this.getProtocolAdapter());
+ }
+ }
+
+ @Override
+ public void stop() {
+ this.scopedUpHandler.remove(this.scope);
+ super.stop();
+ }
+
+ @Override
+ protected GroupRequest cast(Collection
dests, Message msg, RequestOptions options, boolean blockForResults) {
+ return super.cast(dests, msg, addRspFilter(options), blockForResults);
+ }
+
+ private RequestOptions addRspFilter(RequestOptions options) {
+ RspFilter filter = options.getRspFilter();
+ return options.setRspFilter((filter != null) ? new NoHandlerForScopeRspFilter(filter) : new NoHandlerForScopeRspFilter());
+ }
+}
Index: src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java
diff -N src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/ScopedMessageDispatcher.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,65 @@
+package org.jgroups.blocks.scope;
+
+import java.util.Collection;
+
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.MembershipListener;
+import org.jgroups.Message;
+import org.jgroups.MessageListener;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestCorrelator;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.RspFilter;
+
+public class ScopedMessageDispatcher extends MessageDispatcher {
+
+ private final short scope;
+ private final ScopedUpHandler scopedUpHandler;
+
+ public ScopedMessageDispatcher(short scope, ScopedUpHandler scopedUpHandler, Channel channel, MessageListener l, MembershipListener l2, RequestHandler handler) {
+ super(channel, l, l2, handler);
+ this.scope = scope;
+ this.scopedUpHandler = scopedUpHandler;
+ // Unfortunately, dispatcher is already started
+ // Set the scope of the request corrrelator
+ ((ScopedRequestCorrelator) this.corr).setScope(scope);
+ // Register our scope and up handler here since this could not be done in start()
+ this.scopedUpHandler.add(this.scope, this.getProtocolAdapter());
+ }
+
+ @Override
+ protected RequestCorrelator createRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) {
+ // We can't set the scope of the request correlator here
+ // since this method is called from start() triggered in the
+ // MessageDispatcher constructor, when this.scope is not yet defined
+ return new ScopedRequestCorrelator(transport, handler, localAddr);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ // This won't be the case when start() is called via MessageDispatcher constructor
+ if (scopedUpHandler != null) {
+ this.scopedUpHandler.add(this.scope, this.getProtocolAdapter());
+ }
+ }
+
+ @Override
+ public void stop() {
+ this.scopedUpHandler.remove(this.scope);
+ super.stop();
+ }
+
+ @Override
+ protected GroupRequest cast(Collection dests, Message msg, RequestOptions options, boolean blockForResults) {
+ return super.cast(dests, msg, addRspFilter(options), blockForResults);
+ }
+
+ private RequestOptions addRspFilter(RequestOptions options) {
+ RspFilter filter = options.getRspFilter();
+ return options.setRspFilter((filter != null) ? new NoHandlerForScopeRspFilter(filter) : new NoHandlerForScopeRspFilter());
+ }
+}
Index: src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java
diff -N src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/ScopedUpHandlerImpl.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,60 @@
+package org.jgroups.blocks.scope;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jgroups.Event;
+import org.jgroups.Message;
+import org.jgroups.UpHandler;
+import org.jgroups.conf.ClassConfigurator;
+
+public class ScopedUpHandlerImpl implements ScopedUpHandler {
+
+ static {
+ ClassConfigurator.add(ScopeHeader.ID, ScopeHeader.class);
+ }
+
+ private final Map handlers=new ConcurrentHashMap();
+ private final UpHandler defaultHandler;
+
+ public ScopedUpHandlerImpl() {
+ this.defaultHandler = null;
+ }
+
+ public ScopedUpHandlerImpl(UpHandler defaultHandler) {
+ this.defaultHandler = defaultHandler;
+ }
+
+ @Override
+ public void add(short scope, UpHandler handler) {
+ this.handlers.put(scope, handler);
+ }
+
+ @Override
+ public void remove(short scope) {
+ this.handlers.remove(scope);
+ }
+
+ @Override
+ public Object up(Event evt) {
+ switch(evt.getType()) {
+ case Event.MSG: {
+ Message msg=(Message)evt.getArg();
+ ScopeHeader hdr=(ScopeHeader)msg.getHeader(ScopeHeader.ID);
+ if (hdr != null) {
+ short scope = hdr.getScope();
+ UpHandler handler=handlers.get(hdr.getScope());
+ return (handler != null) ? handler.up(evt) : new NoHandlerForScope(scope);
+ }
+ break;
+ }
+ case Event.VIEW_CHANGE: {
+ for(UpHandler handler: handlers.values())
+ handler.up(evt);
+ break;
+ }
+ }
+
+ return (defaultHandler != null) ? defaultHandler.up(evt) : null;
+ }
+}
Index: src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java
diff -N src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/ScopedRequestCorrelator.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,37 @@
+package org.jgroups.blocks.scope;
+
+import java.util.Collection;
+
+import org.jgroups.Address;
+import org.jgroups.Message;
+import org.jgroups.blocks.RequestCorrelator;
+import org.jgroups.blocks.RequestHandler;
+import org.jgroups.blocks.RspCollector;
+import org.jgroups.conf.ClassConfigurator;
+
+public class ScopedRequestCorrelator extends RequestCorrelator {
+
+ private short scope;
+
+ public ScopedRequestCorrelator(Object transport, RequestHandler handler, Address localAddr) {
+ super(ClassConfigurator.getProtocolId(RequestCorrelator.class), transport, handler, localAddr);
+ }
+
+ public void setScope(short scope) {
+ this.scope = scope;
+ }
+
+ @Override
+ public void sendRequest(long id, Collection dest_mbrs, Message msg, RspCollector coll, boolean use_anycasting) throws Exception {
+ ScopeHeader hdr=new ScopeHeader(scope);
+ msg.putHeader(ScopeHeader.ID, hdr);
+ super.sendRequest(id, dest_mbrs, msg, coll, use_anycasting);
+ }
+
+ @Override
+ protected void prepareResponse(Message rsp) {
+ ScopeHeader hdr=new ScopeHeader(scope);
+ rsp.putHeader(ScopeHeader.ID, hdr);
+ super.prepareResponse(rsp);
+ }
+}
Index: src/org/jgroups/blocks/scope/ScopeHeader.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/ScopeHeader.java
diff -N src/org/jgroups/blocks/scope/ScopeHeader.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/ScopeHeader.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,40 @@
+package org.jgroups.blocks.scope;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.jgroups.Global;
+import org.jgroups.Header;
+
+public class ScopeHeader extends Header {
+
+ public static final short ID=1500;
+
+ private short scope;
+
+ public ScopeHeader() {
+ }
+ public ScopeHeader(short scope) {
+ this.scope=scope;
+ }
+
+ public short getScope() {
+ return scope;
+ }
+
+ @Override
+ public int size() {
+ return Global.SHORT_SIZE;
+ }
+
+ @Override
+ public void writeTo(DataOutputStream out) throws IOException {
+ out.writeShort(scope);
+ }
+
+ @Override
+ public void readFrom(DataInputStream in) throws IOException {
+ scope=in.readShort();
+ }
+}
Index: tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java
===================================================================
RCS file: tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java
diff -N tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ tests/other/org/jgroups/tests/ScopedRpcDispatcherTest.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,59 @@
+package org.jgroups.tests;
+
+import org.jgroups.*;
+import org.jgroups.blocks.*;
+import org.jgroups.blocks.scope.ScopedRpcDispatcher;
+import org.jgroups.blocks.scope.ScopedUpHandler;
+import org.jgroups.blocks.scope.ScopedUpHandlerImpl;
+import org.jgroups.util.RspList;
+import org.jgroups.util.Util;
+
+/**
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class ScopedRpcDispatcherTest {
+
+ public static void main(String[] args) throws ChannelException {
+
+ Channel c1=new JChannel("fast-local.xml");
+
+ ScopedUpHandler handler = new ScopedUpHandlerImpl();
+
+ RpcDispatcher disp1=new ScopedRpcDispatcher((short)100, handler, c1, null, null, new Server("foo1()"));
+ RpcDispatcher disp2=new ScopedRpcDispatcher((short)0, handler, c1, null, null, new Server("foo2()"));
+ RpcDispatcher disp3=new ScopedRpcDispatcher((short)150, handler, c1, null, null, new Server("foo3()"));
+
+ c1.setUpHandler(handler);
+
+ c1.connect("scopes");
+
+ while(!Thread.currentThread().isInterrupted()) {
+ Util.keyPress("enter: ");
+
+ RspList rsps=disp1.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC);
+ System.out.println("rsps:\n" + rsps);
+
+ rsps=disp2.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC);
+ System.out.println("rsps:\n" + rsps);
+
+ rsps=disp3.callRemoteMethods(null, "foo", null, null, RequestOptions.SYNC);
+ System.out.println("rsps:\n" + rsps);
+ }
+ c1.disconnect();
+ c1.close();
+ }
+
+ public static class Server {
+ final String name;
+
+ public Server(String name) {
+ this.name=name;
+ }
+
+ public int foo() {
+ System.out.println(name);
+ return (int)Util.random(100);
+ }
+ }
+}
Index: src/org/jgroups/blocks/scope/ScopedUpHandler.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/ScopedUpHandler.java
diff -N src/org/jgroups/blocks/scope/ScopedUpHandler.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/ScopedUpHandler.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,9 @@
+package org.jgroups.blocks.scope;
+
+import org.jgroups.UpHandler;
+
+public interface ScopedUpHandler extends UpHandler {
+
+ void add(short scope, UpHandler handler);
+ void remove(short scope);
+}
Index: src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java
diff -N src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/NoHandlerForScopeRspFilter.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,27 @@
+package org.jgroups.blocks.scope;
+
+import org.jgroups.Address;
+import org.jgroups.blocks.RspFilter;
+
+public class NoHandlerForScopeRspFilter implements RspFilter {
+
+ private final RspFilter filter;
+
+ public NoHandlerForScopeRspFilter() {
+ this.filter = null;
+ }
+
+ public NoHandlerForScopeRspFilter(RspFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public boolean isAcceptable(Object response, Address sender) {
+ return !(response instanceof NoHandlerForScope) && ((filter == null) || filter.isAcceptable(response, sender));
+ }
+
+ @Override
+ public boolean needMoreResponses() {
+ return (filter == null) || filter.needMoreResponses();
+ }
+}
Index: src/org/jgroups/blocks/scope/NoHandlerForScope.java
===================================================================
RCS file: src/org/jgroups/blocks/scope/NoHandlerForScope.java
diff -N src/org/jgroups/blocks/scope/NoHandlerForScope.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ src/org/jgroups/blocks/scope/NoHandlerForScope.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,17 @@
+package org.jgroups.blocks.scope;
+
+import java.io.Serializable;
+
+public class NoHandlerForScope implements Serializable {
+ private static final long serialVersionUID = -694135384125080323L;
+
+ private short scope;
+
+ public NoHandlerForScope(short scope) {
+ this.scope = scope;
+ }
+
+ public short getScope() {
+ return scope;
+ }
+}