diff --git a/src/org/jgroups/blocks/GroupRequest.java b/src/org/jgroups/blocks/GroupRequest.java index 336f773..5318c51 100644 --- a/src/org/jgroups/blocks/GroupRequest.java +++ b/src/org/jgroups/blocks/GroupRequest.java @@ -100,7 +100,7 @@ public class GroupRequest extends Request { public void sendRequest() throws Exception { - sendRequest(requests.keySet(), req_id); + sendRequest(requests.keySet()); } /* ---------------------- Interface RspCollector -------------------------- */ @@ -355,13 +355,13 @@ public class GroupRequest extends Request { } - private void sendRequest(final Collection
targetMembers, long requestId) throws Exception { + private void sendRequest(final Collection
targetMembers) throws Exception { try { - corr.sendRequest(requestId, targetMembers, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this, options); + corr.sendRequest(targetMembers, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this, options); } catch(Exception ex) { if(corr != null) - corr.done(requestId); + corr.done(this.req_id); throw ex; } } diff --git a/src/org/jgroups/blocks/Request.java b/src/org/jgroups/blocks/Request.java index 7f25101..9c09172 100644 --- a/src/org/jgroups/blocks/Request.java +++ b/src/org/jgroups/blocks/Request.java @@ -14,7 +14,6 @@ import org.jgroups.util.Util; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -27,9 +26,6 @@ import java.util.concurrent.locks.ReentrantLock; public abstract class Request implements RspCollector, NotifyingFuture, org.jgroups.util.Condition { protected static final Log log=LogFactory.getLog(Request.class); - /** To generate unique request IDs (see getRequestId()) */ - protected static final AtomicLong REQUEST_ID=new AtomicLong(1); - protected final Lock lock=new ReentrantLock(); /** Is set as soon as the request has received all required responses */ @@ -42,7 +38,7 @@ public abstract class Request implements RspCollector, NotifyingFuture, org.jgro protected volatile boolean done; protected boolean block_for_results=true; - protected final long req_id; // request ID for this request + protected long req_id=0; // request ID for this request (starts at 1), can only be set once protected volatile FutureListener listener; @@ -52,22 +48,26 @@ public abstract class Request implements RspCollector, NotifyingFuture, org.jgro this.request_msg=request; this.corr=corr; this.options=options; - this.req_id=getRequestId(); } - public void setResponseFilter(RspFilter filter) { + public void setResponseFilter(RspFilter filter) { options.setRspFilter(filter); } - public boolean getBlockForResults() { return block_for_results; } - - public void setBlockForResults(boolean block_for_results) { + public void setBlockForResults(boolean block_for_results) { this.block_for_results=block_for_results; } + public Request requestId(long id) { + if(this.req_id > 0) + throw new IllegalStateException(String.format("req-id (%d) already set, cannot be set again (to %d)", this.req_id, id)); + this.req_id=id; + return this; + } + public NotifyingFuture setListener(FutureListener listener) { this.listener=listener; if(done) @@ -170,10 +170,6 @@ public abstract class Request implements RspCollector, NotifyingFuture, org.jgro listener.futureDone(future); } - /** Generates a new unique request ID */ - protected static long getRequestId() { - return REQUEST_ID.incrementAndGet(); - } /** This method runs with lock locked (called by execute()). */ @GuardedBy("lock") diff --git a/src/org/jgroups/blocks/RequestCorrelator.java b/src/org/jgroups/blocks/RequestCorrelator.java index 28a3499..fd0af14 100644 --- a/src/org/jgroups/blocks/RequestCorrelator.java +++ b/src/org/jgroups/blocks/RequestCorrelator.java @@ -11,6 +11,7 @@ import org.jgroups.stack.DiagnosticsHandler; import org.jgroups.stack.Protocol; import org.jgroups.util.Bits; import org.jgroups.util.Buffer; +import org.jgroups.util.RequestTable; import org.jgroups.util.Util; import java.io.DataInput; @@ -18,23 +19,16 @@ import java.io.DataOutput; import java.io.NotSerializableException; import java.lang.reflect.InvocationTargetException; import java.util.*; -import java.util.concurrent.ConcurrentMap; /** - * Framework to send requests and receive matching responses (matching on - * request ID). - * Multiple requests can be sent at a time. Whenever a response is received, - * the correct {@code RspCollector} is looked up (key = id) and its - * method {@code receiveResponse()} invoked. A caller may use - * {@code done()} to signal that no more responses are expected, and that - * the corresponding entry may be removed. - *

- * {@code RequestCorrelator} can be installed at both client and server - * sides, it can also switch roles dynamically; i.e., send a request and at - * the same time process an incoming request (when local delivery is enabled, + * Framework to send requests and receive matching responses (matching on request ID). + * Multiple requests can be sent at a time. Whenever a response is received, the correct {@code RspCollector} is + * looked up (key = id) and its method {@code receiveResponse()} invoked. A caller may use {@code done()} to signal + * that no more responses are expected, and that the corresponding entry may be removed.

+ * {@code RequestCorrelator} can be installed at both client and server sides, it can also switch roles dynamically; + * i.e. send a request and at the same time process an incoming request (when local delivery is enabled, * this is actually the default). - *

* * @author Bela Ban */ @@ -43,8 +37,8 @@ public class RequestCorrelator { /** The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport */ protected Protocol transport; - /** The table of pending requests (keys=Long (request IDs), values=RequestEntry) */ - protected final ConcurrentMap requests=Util.createConcurrentMap(); + /** The table of pending requests (keys=Long (request IDs), values=Request) */ + protected final RequestTable requests=new RequestTable<>(16, 1, 1); // .removesTillCompaction(10); /** The handler for the incoming requests. It is called from inside the dispatcher thread */ @@ -72,6 +66,7 @@ public class RequestCorrelator { private final MyProbeHandler probe_handler=new MyProbeHandler(requests); protected static final Log log=LogFactory.getLog(RequestCorrelator.class); + protected static final TransportClosedVisitor transport_closed_visitor=new TransportClosedVisitor(); /** @@ -119,44 +114,40 @@ public class RequestCorrelator { public boolean wrapExceptions() {return wrap_exceptions;} public RequestCorrelator wrapExceptions(boolean flag) {wrap_exceptions=flag; return this;} - public void sendRequest(long id, List

dest_mbrs, Message msg, RspCollector coll) throws Exception { - sendRequest(id, dest_mbrs, msg, coll, new RequestOptions().setAnycasting(false)); + public void sendRequest(List
dest_mbrs, Message msg, Request req) throws Exception { + sendRequest(dest_mbrs, msg, req, new RequestOptions().setAnycasting(false)); } /** * Sends a request to a group. If no response collector is given, no responses are expected (making the call asynchronous) * - * @param id The request ID. Must be unique for this JVM (e.g. current time in millisecs) - * @param dest_mbrs The list of members who should receive the call. Usually a group RPC - * is sent via multicast, but a receiver drops the request if its own address - * is not in this list. Will not be used if it is null. - * @param msg The request to be sent. The body of the message carries - * the request data - * - * @param coll A response collector (usually the object that invokes this method). Its methods - * {@code receiveResponse()} and {@code suspect()} will be invoked when a message has been received - * or a member is suspected, respectively. + * @param dest_mbrs The list of members who should receive the call. Usually a group RPC is sent via multicast, + * but a receiver drops the request if its own address is not in this list. Will not be used if null. + * @param msg The request to be sent. The body of the message carries the request data + * @param request A request (usually the object that invokes this method). Its methods {@code receiveResponse()} and + * {@code suspect()} will be invoked when a message has been received or a member is suspected, respectively. */ - public void sendRequest(long id, Collection
dest_mbrs, Message msg, RspCollector coll, RequestOptions options) throws Exception { + public void sendRequest(Collection
dest_mbrs, Message msg, Request request, RequestOptions options) throws Exception { if(transport == null) { if(log.isWarnEnabled()) log.warn("transport is not available !"); return; } // i. Create the request correlator header and add it to the msg - // ii. If a reply is expected (coll != null), add a coresponding entry in the pending requests table - // iii. If deadlock detection is enabled, set/update the call stack - // iv. Pass the msg down to the protocol layer below + // ii. If a reply is expected (coll != null), add a corresponding entry in the pending requests table + // iii. Pass the msg down to the protocol layer below Header hdr=options.hasExclusionList()? - new MultiDestinationHeader(Header.REQ, id, (coll != null), this.id, options.exclusionList()) - : new Header(Header.REQ, id, (coll != null), this.id); + new MultiDestinationHeader(Header.REQ, (request != null? request.req_id : 0), this.id, options.exclusionList()) + : new Header(Header.REQ, request != null? request.req_id : 0, this.id); msg.putHeader(this.id, hdr); - if(coll != null) { - addEntry(hdr.id, coll); + if(request != null) { + long request_id=requests.add(request); + request.requestId(request_id); + hdr.req_id=request_id; // make sure no view is received before we add ourself as a view handler (https://issues.jboss.org/browse/JGRP-1428) - coll.viewChange(view); + request.viewChange(view); } if(options.getAnycasting()) { @@ -182,13 +173,12 @@ public class RequestCorrelator { /** * Sends a request to a single destination - * @param id * @param target * @param msg * @param coll * @throws Exception */ - public void sendUnicastRequest(long id, Address target, Message msg, RspCollector coll) throws Exception { + public void sendUnicastRequest(Address target, Message msg, Request request) throws Exception { if(transport == null) { if(log.isWarnEnabled()) log.warn("transport is not available !"); return; @@ -196,17 +186,17 @@ public class RequestCorrelator { // i. Create the request correlator header and add it to the msg // ii. If a reply is expected (coll != null), add a coresponding entry in the pending requests table - // iii. If deadlock detection is enabled, set/update the call stack - // iv. Pass the msg down to the protocol layer below - Header hdr=new Header(Header.REQ, id, (coll != null), this.id); + // iii. Pass the msg down to the protocol layer below + Header hdr=new Header(Header.REQ, request != null? request.req_id : 0, this.id); msg.putHeader(this.id, hdr); - if(coll != null) { - addEntry(hdr.id, coll); + if(request != null) { + long request_id=requests.add(request); + request.requestId(request_id); + hdr.req_id=request_id; // make sure no view is received before we add ourself as a view handler (https://issues.jboss.org/browse/JGRP-1428) - coll.viewChange(view); + request.viewChange(view); } - transport.down(new Event(Event.MSG, msg)); } @@ -218,7 +208,7 @@ public class RequestCorrelator { * Used to signal that a certain request may be garbage collected as all responses have been received. */ public void done(long id) { - removeEntry(id); + requests.remove(id); } @@ -271,9 +261,7 @@ public class RequestCorrelator { public void stop() { started=false; - for(RspCollector coll: requests.values()) - coll.transportClosed(); - requests.clear(); + requests.forEach(transport_closed_visitor).clear(); } @@ -299,23 +287,14 @@ public class RequestCorrelator { */ public void receiveSuspect(Address mbr) { if(mbr == null) return; - if(log.isDebugEnabled()) log.debug("suspect=" + mbr); - - // copy so we don't run into bug #761804 - Bela June 27 2003 - // copy=new ArrayList(requests.values()); // removed because ConcurrentReaderHashMap can tolerate concurrent mods (bela May 8 2006) - for(RspCollector coll: requests.values()) { - if(coll != null) - coll.suspect(mbr); - } + log.debug("suspecting %s", mbr); + requests.forEach(new SuspectVisitor(mbr)); } /** An entire site is down; mark all requests that point to that site as unreachable (used by RELAY2) */ public void setSiteUnreachable(String site) { - for(RspCollector coll: requests.values()) { - if(coll != null) - coll.siteUnreachable(site); - } + requests.forEach(new SiteUnreachableVisitor(site)); } @@ -327,14 +306,8 @@ public class RequestCorrelator { * */ public void receiveView(View new_view) { - // ArrayList copy; - // copy so we don't run into bug #761804 - Bela June 27 2003 - // copy=new ArrayList(requests.values()); // removed because ConcurrentHashMap can tolerate concurrent mods (bela May 8 2006) view=new_view; // move this before the iteration (JGRP-1428) - for(RspCollector coll: requests.values()) { - if(coll != null) - coll.viewChange(new_view); - } + requests.forEach(new ViewVisitor(new_view)); } @@ -368,15 +341,8 @@ public class RequestCorrelator { } } - // [Header.REQ]: - // i. If there is no request handler, discard - // ii. Check whether priority: if synchronous and call stack contains - // address that equals local address -> add priority request. Else - // add normal request. - // - // [Header.RSP]: - // Remove the msg request correlator header and notify the associated - // RspCollector that a reply has been received + // REQ: if there is no request handler, discard. Else process normal request. + // RSP: remove the request header and notify the associated request that a reply has been received switch(hdr.type) { case Header.REQ: handleRequest(msg, hdr); @@ -384,7 +350,7 @@ public class RequestCorrelator { case Header.RSP: case Header.EXC_RSP: - RspCollector coll=requests.get(hdr.id); + RspCollector coll=requests.get(hdr.req_id); if(coll != null) { boolean is_exception=hdr.type == Header.EXC_RSP; Address sender=msg.getSrc(); @@ -395,13 +361,15 @@ public class RequestCorrelator { retval=marshaller != null? marshaller.objectFromBuffer(buf, offset, length) : Util.objectFromByteBuffer(buf, offset, length); } - catch(Exception e) { + catch(Throwable e) { log.error(Util.getMessage("FailedUnmarshallingBufferIntoReturnValue"), e); retval=e; is_exception=true; } coll.receiveResponse(retval, sender, is_exception); } + else + System.err.printf("request for req_id=%d not found, table: %s\n", hdr.req_id, requests); break; default: @@ -424,27 +392,6 @@ public class RequestCorrelator { // ....................................................................... - /** - * Add an association of:
- * ID -> RspCollector - */ - private void addEntry(long id, RspCollector coll) { - requests.putIfAbsent(id, coll); - } - - - /** - * Remove the request entry associated with the given ID - * - * @param id the id of the RequestEntry to remove - */ - private void removeEntry(long id) { - // changed by bela Feb 28 2003 (bug fix for 690606) - // changed back to use synchronization by bela June 27 2003 (bug fix for #761804), - // we can do this because we now copy for iteration (viewChange() and suspect()) - requests.remove(id); - } - /** @@ -457,10 +404,10 @@ public class RequestCorrelator { if(log.isTraceEnabled()) { log.trace(new StringBuilder("calling (").append((request_handler != null? request_handler.getClass().getName() : "null")). - append(") with request ").append(hdr.id)); + append(") with request ").append(hdr.req_id)); } if(async_dispatching && request_handler instanceof AsyncRequestHandler) { - Response rsp=hdr.rsp_expected? new ResponseImpl(req, hdr.id) : null; + Response rsp=hdr.responseExpected()? new ResponseImpl(req, hdr.req_id) : null; try { ((AsyncRequestHandler)request_handler).handle(req, rsp); } @@ -480,8 +427,8 @@ public class RequestCorrelator { threw_exception=true; retval=wrap_exceptions ? new InvocationTargetException(t) : t; } - if(hdr.rsp_expected) - sendReply(req, hdr.id, retval, threw_exception); + if(hdr.responseExpected()) + sendReply(req, hdr.req_id, retval, threw_exception); } @@ -513,10 +460,10 @@ public class RequestCorrelator { protected void sendResponse(Message rsp, long req_id, boolean is_exception) { prepareResponse(rsp); - Header rsp_hdr=new Header(is_exception? Header.EXC_RSP : Header.RSP, req_id, false, id); + Header rsp_hdr=new Header(is_exception? Header.EXC_RSP : Header.RSP, req_id, id); rsp.putHeader(id, rsp_hdr); if(log.isTraceEnabled()) - log.trace(new StringBuilder("sending rsp for ").append(rsp_hdr.id).append(" to ").append(rsp.getDest())); + log.trace(new StringBuilder("sending rsp for ").append(rsp_hdr.req_id).append(" to ").append(rsp.getDest())); transport.down(new Event(Event.MSG, rsp)); } @@ -559,11 +506,8 @@ public class RequestCorrelator { /** Type of header: request or reply */ public byte type; /** - * The id of this request to distinguish among other requests from the same RequestCorrelator */ - public long id; - - /** msg is synchronous if true */ - public boolean rsp_expected; + * The request ID to correlate responses to requests. A req_id of -1 means that no responses are expected (async RPC) */ + public long req_id; /** The unique ID of the associated RequestCorrelator */ public short corrId; @@ -577,18 +521,19 @@ public class RequestCorrelator { /** * @param type type of header (REQ/RSP) - * @param id id of this header relative to ids of other requests + * @param req_id id of this header relative to ids of other requests * originating from the same correlator * @param rsp_expected whether it's a sync or async request * @param corr_id The ID of the RequestCorrelator from which */ - public Header(byte type, long id, boolean rsp_expected, short corr_id) { + public Header(byte type, long req_id, short corr_id) { this.type = type; - this.id = id; - this.rsp_expected = rsp_expected; + this.req_id = req_id; this.corrId = corr_id; } + public boolean responseExpected() {return req_id > 0;} + public String toString() { StringBuilder ret=new StringBuilder(); ret.append("id=" + corrId + ", type="); @@ -601,30 +546,25 @@ public class RequestCorrelator { break; default: ret.append(""); } - ret.append(", id=" + id); - ret.append(", rsp_expected=" + rsp_expected); - return ret.toString(); + return ret.append(", req_id=").append(req_id).toString(); } public void writeTo(DataOutput out) throws Exception { out.writeByte(type); - Bits.writeLong(id,out); - out.writeBoolean(rsp_expected); + Bits.writeLong(req_id, out); out.writeShort(corrId); } public void readFrom(DataInput in) throws Exception { type=in.readByte(); - id=Bits.readLong(in); - rsp_expected=in.readBoolean(); + req_id=Bits.readLong(in); corrId=in.readShort(); } public int size() { return Global.BYTE_SIZE // type - + Bits.size(id) // id - + Global.BYTE_SIZE // rsp_expected + + Bits.size(req_id) // id + Global.SHORT_SIZE; // corrId } } @@ -638,8 +578,8 @@ public class RequestCorrelator { public MultiDestinationHeader() { } - public MultiDestinationHeader(byte type, long id, boolean rsp_expected, short corr_id, Address[] exclusion_list) { - super(type, id, rsp_expected, corr_id); + public MultiDestinationHeader(byte type, long id, short corr_id, Address[] exclusion_list) { + super(type, id, corr_id); this.exclusion_list=exclusion_list; } @@ -668,36 +608,98 @@ public class RequestCorrelator { - private static class MyProbeHandler implements DiagnosticsHandler.ProbeHandler { - private final ConcurrentMap requests; + private class MyProbeHandler implements DiagnosticsHandler.ProbeHandler { + private final RequestTable requests; - private MyProbeHandler(ConcurrentMap requests) { - this.requests=requests; - } + private MyProbeHandler(RequestTable requests) {this.requests=requests;} public Map handleProbe(String... keys) { if(requests == null) return null; Map retval=new HashMap<>(); for(String key: keys) { - if(key.equals("requests")) { - StringBuilder sb=new StringBuilder(); - for(Map.Entry entry: requests.entrySet()) { - sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); - } - retval.put("requests", sb.toString()); - break; + switch(key) { + case "requests": + ProbeVisitor visitor=new ProbeVisitor(); + requests.forEach(visitor); + retval.put("requests", visitor.get()); + break; + case "reqtable-size": + retval.put(local_addr + ": reqtable-size", requests.toString()); + break; + case "reqtable-compact": + int cap=requests.capacity(); + boolean rc=requests.compact(); + retval.put("reqtable-compact", String.format("success: %b, old cap=%d, new cap=%d", rc, cap, requests.capacity())); + break; + case "reqtable-dump": + retval.put(local_addr + ": reqtable-dump", requests.dumpContents()); + break; } } return retval; } public String[] supportedKeys() { - return new String[]{"requests"}; + return new String[]{"requests", "reqtable-size", "reqtable-compact", "reqtable-dump"}; + } + } + + protected static class ProbeVisitor implements RequestTable.Visitor { + protected final StringBuilder sb=new StringBuilder(); + + public boolean visit(Request request) { + if(request != null) + sb.append(String.format("%s\n", request)); + return true; } + public String get() {return sb.toString();} } + protected static class TransportClosedVisitor implements RequestTable.Visitor { + public boolean visit(Request request) { + if(request != null) + request.transportClosed(); + return true; + } + } + + protected static class SuspectVisitor implements RequestTable.Visitor { + protected final Address mbr; + + public SuspectVisitor(Address mbr) {this.mbr=mbr;} + + public boolean visit(Request request) { + if(request != null) + request.suspect(mbr); + return true; + } + } + + protected static class SiteUnreachableVisitor implements RequestTable.Visitor { + protected final String site; + + public SiteUnreachableVisitor(String site) {this.site=site;} + + public boolean visit(Request request) { + if(request != null) + request.siteUnreachable(site); + return true; + } + } + + protected static class ViewVisitor implements RequestTable.Visitor { + protected final View view; + + public ViewVisitor(View view) {this.view=view;} + + public boolean visit(Request request) { + if(request != null) + request.viewChange(view); + return true; + } + } } diff --git a/src/org/jgroups/blocks/UnicastRequest.java b/src/org/jgroups/blocks/UnicastRequest.java index 60d3b09..8a13293 100644 --- a/src/org/jgroups/blocks/UnicastRequest.java +++ b/src/org/jgroups/blocks/UnicastRequest.java @@ -20,7 +20,7 @@ import java.util.concurrent.TimeoutException; public class UnicastRequest extends Request { protected final Rsp result; protected final Address target; - protected int num_received=0; + protected int num_received; @@ -40,7 +40,7 @@ public class UnicastRequest extends Request { protected void sendRequest() throws Exception { try { if(log.isTraceEnabled()) log.trace(new StringBuilder("sending request (id=").append(req_id).append(')')); - corr.sendUnicastRequest(req_id, target, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this); + corr.sendUnicastRequest(target, request_msg, options.getMode() == ResponseMode.GET_NONE? null : this); } catch(Exception ex) { if(corr != null) diff --git a/src/org/jgroups/blocks/mux/MuxRequestCorrelator.java b/src/org/jgroups/blocks/mux/MuxRequestCorrelator.java index a1be933..b2c2347 100644 --- a/src/org/jgroups/blocks/mux/MuxRequestCorrelator.java +++ b/src/org/jgroups/blocks/mux/MuxRequestCorrelator.java @@ -1,16 +1,13 @@ package org.jgroups.blocks.mux; -import java.util.Collection; - import org.jgroups.Address; import org.jgroups.Message; -import org.jgroups.blocks.RequestCorrelator; -import org.jgroups.blocks.RequestHandler; -import org.jgroups.blocks.RspCollector; -import org.jgroups.blocks.RequestOptions; +import org.jgroups.blocks.*; import org.jgroups.conf.ClassConfigurator; import org.jgroups.stack.Protocol; +import java.util.Collection; + /** * A request correlator that adds a mux header to incoming and outgoing messages. * @author Bela Ban @@ -28,15 +25,15 @@ public class MuxRequestCorrelator extends RequestCorrelator { } @Override - public void sendRequest(long requestId, Collection
dest_mbrs, Message msg, RspCollector coll, RequestOptions options) throws Exception { + public void sendRequest(Collection
dest_mbrs, Message msg, Request req, RequestOptions options) throws Exception { msg.putHeader(MUX_ID, header); - super.sendRequest(requestId, dest_mbrs, msg, coll, options); + super.sendRequest(dest_mbrs, msg, req, options); } @Override - public void sendUnicastRequest(long id, Address target, Message msg, RspCollector coll) throws Exception { + public void sendUnicastRequest(Address target, Message msg, Request req) throws Exception { msg.putHeader(MUX_ID, header); - super.sendUnicastRequest(id, target, msg, coll); + super.sendUnicastRequest(target, msg, req); } @Override diff --git a/tests/junit-functional/org/jgroups/tests/SizeTest.java b/tests/junit-functional/org/jgroups/tests/SizeTest.java index 0070842..e48de06 100644 --- a/tests/junit-functional/org/jgroups/tests/SizeTest.java +++ b/tests/junit-functional/org/jgroups/tests/SizeTest.java @@ -839,13 +839,13 @@ public class SizeTest { - public static void testRequestCorrelatorHeader() throws Exception { + public void testRequestCorrelatorHeader() throws Exception { RequestCorrelator.Header hdr; - hdr=new RequestCorrelator.Header(RequestCorrelator.Header.REQ, 322649, false, (short)1000); + hdr=new RequestCorrelator.Header(RequestCorrelator.Header.REQ, 0, (short)1000); _testSize(hdr); - hdr=new RequestCorrelator.Header(RequestCorrelator.Header.RSP, 322649, true, (short)356); + hdr=new RequestCorrelator.Header(RequestCorrelator.Header.RSP, 322649, (short)356); ByteArrayOutputStream output=new ByteArrayOutputStream(); DataOutputStream out=new DataOutputStream(output); @@ -861,13 +861,13 @@ public class SizeTest { hdr=new RequestCorrelator.Header(); hdr.readFrom(in); - Assert.assertEquals(322649, hdr.id); - assert hdr.rsp_expected; + Assert.assertEquals(322649, hdr.req_id); + assert hdr.responseExpected(); Assert.assertEquals((short)356, hdr.corrId); Assert.assertEquals(RequestCorrelator.Header.RSP, hdr.type); - hdr=new RequestCorrelator.Header(RequestCorrelator.Header.RSP, 322649, true, (short)356); + hdr=new RequestCorrelator.Header(RequestCorrelator.Header.RSP, 322649, (short)356); output=new ByteArrayOutputStream(); out=new DataOutputStream(output); @@ -883,14 +883,14 @@ public class SizeTest { hdr=new RequestCorrelator.Header(); hdr.readFrom(in); - Assert.assertEquals(322649, hdr.id); - assert hdr.rsp_expected; + Assert.assertEquals(322649, hdr.req_id); + assert hdr.responseExpected(); Assert.assertEquals(356, hdr.corrId); Assert.assertEquals(RequestCorrelator.Header.RSP, hdr.type); Address a=Util.createRandomAddress("A"), b=Util.createRandomAddress("B"); - hdr=new RequestCorrelator.MultiDestinationHeader(RequestCorrelator.Header.REQ, 322649, true, (short)22, new Address[]{a,b}); + hdr=new RequestCorrelator.MultiDestinationHeader(RequestCorrelator.Header.REQ, 322649, (short)22, new Address[]{a,b}); _testSize(hdr); }