geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdam...@apache.org
Subject cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging RequestSender.java
Date Thu, 24 Jun 2004 23:52:12 GMT
gdamour     2004/06/24 16:52:12

  Modified:    sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/admin
                        JoinRequest.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode
                        LogicalCompression.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging
                        RequestSender.java
  Log:
  o RequestSender flags Reqest Msgs with a byte instead of an int;
  o Improve a little bit LogicalCompression to compress systematically headers which do not
require the
  shared knowledge of a NodeTopology.
  
  Revision  Changes    Path
  1.3       +5 -3      incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/admin/JoinRequest.java
  
  Index: JoinRequest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/admin/JoinRequest.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JoinRequest.java	3 Jun 2004 14:51:16 -0000	1.2
  +++ JoinRequest.java	24 Jun 2004 23:52:12 -0000	1.3
  @@ -21,6 +21,7 @@
   
   import org.apache.geronimo.messaging.CommunicationException;
   import org.apache.geronimo.messaging.Msg;
  +import org.apache.geronimo.messaging.MsgBody;
   import org.apache.geronimo.messaging.MsgHeader;
   import org.apache.geronimo.messaging.MsgHeaderConstants;
   import org.apache.geronimo.messaging.NodeInfo;
  @@ -74,8 +75,9 @@
           header.addHeader(MsgHeaderConstants.DEST_NODES, joined);
           header.addHeader(MsgHeaderConstants.SRC_ENDPOINT, "");
           header.addHeader(MsgHeaderConstants.CORRELATION_ID,
  -            new RequestSender.RequestID(new Integer(0)));
  -
  +            new RequestSender.RequestID((byte) 0));
  +        header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST);
  +        
           msg.getBody().setContent(joiner);
   
           final FutureResult result = new FutureResult();
  
  
  
  1.2       +40 -43    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java
  
  Index: LogicalCompression.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- LogicalCompression.java	11 May 2004 12:06:42 -0000	1.1
  +++ LogicalCompression.java	24 Jun 2004 23:52:12 -0000	1.2
  @@ -18,6 +18,8 @@
   package org.apache.geronimo.messaging.remotenode;
   
   import java.io.IOException;
  +import java.util.ArrayList;
  +import java.util.List;
   
   import org.apache.geronimo.messaging.Msg;
   import org.apache.geronimo.messaging.MsgBody;
  @@ -51,22 +53,22 @@
       /**
        * No logical compression.
        */
  -    private final static byte NULL = 0x00;
  +    private final static byte NULL = (byte) 0;
       
       /**
        * Compression based on the Topology shared knowledge.
        */
  -    private final static byte TOPOLOGY = 0x01;
  +    private final static byte TOPOLOGY = (byte) 1;
   
       /**
        * Identifies a request.
        */
  -    private final static byte REQUEST = 0x01;
  +    private final static byte REQUEST = (byte) 0;
       
       /**
        * Identifies a response.
        */
  -    private final static byte RESPONSE = 0x02;
  +    private final static byte RESPONSE = (byte) 1;
   
       public NodeTopology getTopology() {
           return topology;
  @@ -78,60 +80,68 @@
       
       public Object beforePop(StreamInputStream anIn)
           throws IOException {
  +        List result = new ArrayList(5);
  +        int bodyType = anIn.readByte();
  +        if ( REQUEST == bodyType ) {
  +            result.add(MsgBody.Type.REQUEST);
  +        } else {
  +            result.add(MsgBody.Type.RESPONSE);
  +        }
  +        byte reqID = anIn.readByte();
  +        result.add(new RequestSender.RequestID(reqID));
           byte type = anIn.readByte(); 
           if ( type == NULL ) {
  -            return null;
  +            return result;
           }
           if ( null == topology ) {
               throw new IllegalArgumentException("No topology is defined.");
           }
  -        Object[] result = new Object[5];
           int id = anIn.readInt();
           NodeInfo nodeInfo = topology.getNodeById(id);
  -        result[0] = nodeInfo;
  +        result.add(nodeInfo);
   
           id = anIn.readInt();
           nodeInfo = topology.getNodeById(id);
  -        result[1] = nodeInfo;
  +        result.add(nodeInfo);
           
           id = anIn.readInt();
           nodeInfo = topology.getNodeById(id);
  -        result[2] = nodeInfo;
  -        
  -        int bodyType = anIn.read();
  -        if ( REQUEST == bodyType ) {
  -            result[3] = MsgBody.Type.REQUEST;
  -        } else {
  -            result[3] = MsgBody.Type.RESPONSE;
  -        }
  -        
  -        int reqID = anIn.readInt();
  -        result[4] = new RequestSender.RequestID(new Integer(reqID));
  -        return result;
  +        result.add(nodeInfo);
  +        return result.toArray();
       }
       
       public void afterPop(StreamInputStream anIn, Msg aMsg, Object anOpaque)
           throws IOException {
  -        if ( null == anOpaque ) {
  +        List prePop = (List) anOpaque;
  +        MsgHeader header = aMsg.getHeader();
  +        header.addHeader(MsgHeaderConstants.BODY_TYPE, prePop.get(0));
  +        header.addHeader(MsgHeaderConstants.CORRELATION_ID, prePop.get(1));
  +        if ( 5 != prePop.size() ) {
               return;
           }
  -        Object[] prePop = (Object[]) anOpaque;
  -        MsgHeader header = aMsg.getHeader();
  -        header.addHeader(MsgHeaderConstants.SRC_NODE, prePop[0]);
  -        header.addHeader(MsgHeaderConstants.DEST_NODE, prePop[1]);
  -        header.addHeader(MsgHeaderConstants.DEST_NODES, prePop[2]);
  -        header.addHeader(MsgHeaderConstants.BODY_TYPE, prePop[3]);
  -        header.addHeader(MsgHeaderConstants.CORRELATION_ID, prePop[4]);
  +        header.addHeader(MsgHeaderConstants.SRC_NODE, prePop.get(2));
  +        header.addHeader(MsgHeaderConstants.DEST_NODE, prePop.get(3));
  +        header.addHeader(MsgHeaderConstants.DEST_NODES, prePop.get(4));
       }
       
       public Object beforePush(StreamOutputStream anOut, Msg aMsg)
           throws IOException {
  +        MsgHeader header = aMsg.getHeader();
  +        MsgBody.Type type  = (MsgBody.Type)
  +            header.resetHeader(MsgHeaderConstants.BODY_TYPE);
  +        if ( type == MsgBody.Type.REQUEST ) {
  +            anOut.writeByte(REQUEST);
  +        } else {
  +            anOut.writeByte(RESPONSE);
  +        }
  +        RequestSender.RequestID reqID  = (RequestSender.RequestID)
  +            header.resetHeader(MsgHeaderConstants.CORRELATION_ID);
  +        anOut.writeByte(reqID.getID());
           if ( null == topology ) {
               anOut.writeByte(NULL);
               return null;
           }
           anOut.writeByte(TOPOLOGY);
  -        MsgHeader header = aMsg.getHeader();
           
           NodeInfo info =
               (NodeInfo) header.resetHeader(MsgHeaderConstants.SRC_NODE);
  @@ -144,19 +154,6 @@
           NodeInfo target =
               (NodeInfo) header.resetHeader(MsgHeaderConstants.DEST_NODES);
           anOut.writeInt(topology.getIDOfNode(target));
  -        
  -        MsgBody.Type type  = (MsgBody.Type)
  -        header.resetHeader(MsgHeaderConstants.BODY_TYPE);
  -        if ( type == MsgBody.Type.REQUEST ) {
  -            anOut.write(REQUEST);
  -        } else {
  -            anOut.write(RESPONSE);
  -        }
  -        
  -        RequestSender.RequestID reqID  = (RequestSender.RequestID)
  -            header.resetHeader(MsgHeaderConstants.CORRELATION_ID);
  -        anOut.writeInt(reqID.getID());
  -        
           return null;
       }
       
  
  
  
  1.4       +66 -35    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/RequestSender.java
  
  Index: RequestSender.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/RequestSender.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- RequestSender.java	10 Jun 2004 23:08:07 -0000	1.3
  +++ RequestSender.java	24 Jun 2004 23:52:12 -0000	1.4
  @@ -22,8 +22,6 @@
   import java.io.ObjectInput;
   import java.io.ObjectOutput;
   import java.lang.reflect.InvocationTargetException;
  -import java.util.HashMap;
  -import java.util.Map;
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  @@ -45,17 +43,27 @@
       /**
        * Number of milliseconds to wait for a response.
        */
  -    private static final long WAIT_RESPONSE = 5000;
  +    private static final long WAIT_RESPONSE = 2000;
  +
  +    /**
  +     * Maximum number of requests that this instance can performed concurrently.
  +     */
  +    private static final int MAX_CONCURRENT_REQUEST = 255;
  +    
  +    /**
  +     * Memory barrier for seqID counter.
  +     */
  +    private final Object seqMemBarrier = new Object(); 
       
       /**
        * Used to generate request identifiers.
        */
  -    private static volatile int seqID = 0;
  +    private int seqID = 0;
       
       /**
  -     * Request id to FuturResult map.
  +     * Request id to FuturResult[].
        */
  -    private final Map responses;
  +    private final Object[] responses;
   
       
       /**
  @@ -66,7 +74,7 @@
        * request.
        */
       public RequestSender() {
  -        responses = new HashMap();
  +        responses = new Object[MAX_CONCURRENT_REQUEST + 1];
       }
   
       /**
  @@ -101,8 +109,8 @@
           Msg msg = new Msg();
           
           MsgHeader header = msg.getHeader();
  -        RequestID id = createID(aTargetNodes);
  -        header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
  +        IDTOFutureResult futurResult = createID(aTargetNodes);
  +        header.addHeader(MsgHeaderConstants.CORRELATION_ID, futurResult.id);
           header.addHeader(MsgHeaderConstants.DEST_NODES, aTargetNodes);
           header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST);
           header.addHeader(MsgHeaderConstants.DEST_ENDPOINT, aTargetID);
  @@ -112,7 +120,7 @@
           
           anOut.push(msg);
   
  -        Result result = waitResponse(id, WAIT_RESPONSE);
  +        Result result = waitResponse(futurResult.futurResults, WAIT_RESPONSE);
           if ( !result.isSuccess() ) {
               throw new CommunicationException(result.getThrowable());
           }
  @@ -124,16 +132,27 @@
        * identifier for this slot.
        * 
        * @param aTargetNodes Nodes to which the request is to be sent.
  -     * @return Request identifier.
  +     * @return Request identifier and FutureResults.
        */
  -    private RequestID createID(NodeInfo[] aTargetNodes) {
  +    private IDTOFutureResult createID(NodeInfo[] aTargetNodes) {
           FutureResult[] results = new FutureResult[aTargetNodes.length];
           for (int i = 0; i < results.length; i++) {
               results[i] = new FutureResult();
           }
  -        RequestID id = new RequestID(new Integer(seqID++));
  -        responses.put(id, results);
  -        return id;
  +        int idAsInt;
  +        synchronized (seqMemBarrier) {
  +            // Implementation note: it is unlikely to have more than
  +            // MAX_CONCURRENT_REQUEST Threads sending requests concurrently;
  +            // This implementation assumes this unlikelihood. 
  +            if ( MAX_CONCURRENT_REQUEST == ++seqID ) seqID = 1;
  +            responses[seqID] = results;
  +            idAsInt = seqID;
  +        }
  +        RequestID id = new RequestID((byte)idAsInt);
  +        IDTOFutureResult result = new IDTOFutureResult();
  +        result.id = id;
  +        result.futurResults = results;
  +        return result;
       }
       
       /**
  @@ -143,15 +162,13 @@
        * @param aWaitTime number of milliseconds to wait for a response.
        * @return Result of the request.
        */
  -    private Result waitResponse(RequestID anID, long aWaitTime) {
  -        FutureResult[] results = (FutureResult[]) responses.get(anID);
  +    private Result waitResponse(FutureResult[] aResults, long aWaitTime) {
           Exception ex;
           try {
               Result returned = null;
  -            for (int i = 0; i < results.length; i++) {
  -                returned = (Result) results[i].timedGet(aWaitTime);
  +            for (int i = 0; i < aResults.length; i++) {
  +                returned = (Result) aResults[i].timedGet(aWaitTime);
               }
  -            responses.remove(anID);
               return returned;
           } catch (TimeoutException e) {
               log.error(e);
  @@ -176,46 +193,60 @@
           if ( false == anID instanceof RequestID ) {
               throw new IllegalArgumentException("ID is of the wrong type.");
           }
  -        FutureResult[] results = (FutureResult[]) responses.get(anID);
  -        for (int i = 0; i < results.length; i++) {
  -            FutureResult result = results[i];
  -            if ( null == result.peek() ) {
  -                result.set(aResult);
  -                break;
  +        RequestID id = (RequestID) anID;
  +        int index = id.id <= 0 ? id.id & 127 + 128 : id.id;
  +        FutureResult[] results;
  +        results = (FutureResult[]) responses[index];
  +        if ( null == results ) {
  +            log.error("Invalid request ID {" + anID + "}");
  +            return;
  +        }
  +        synchronized (results) {
  +            for (int i = 0; i < results.length; i++) {
  +                FutureResult result = results[i];
  +                if ( null == result.peek() ) {
  +                    result.set(aResult);
  +                    break;
  +                }
               }
           }
       }
       
  +    private static class IDTOFutureResult {
  +        private RequestID id;
  +        private FutureResult[] futurResults;
  +    }
  +    
       /**
        * Request identifier.
        */
       public static class RequestID implements Externalizable {
  -        protected Integer id;
  +        protected byte id;
           /**
            * Required for Externalization.
            */
           public RequestID() {}
  -        public RequestID(Integer anID) {
  +        public RequestID(byte anID) {
               id = anID;
           }
  -        public int getID() {
  -            return id.intValue();
  +        public byte getID() {
  +            return id;
           }
           public void writeExternal(ObjectOutput out) throws IOException {
  -            out.writeInt(id.intValue());
  +            out.write(id);
           }
           public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
  -            id = new Integer(in.readInt());
  +            id = (byte) in.read();
           }
           public int hashCode() {
  -            return id.hashCode();
  +            return id;
           }
           public boolean equals(Object obj) {
               if ( false == obj instanceof RequestID ) {
                   return false;
               }
               RequestID otherID = (RequestID) obj;
  -            return id.equals(otherID.id);
  +            return id == otherID.id;
           }
           public String toString() {
               return "ID=" + id;
  
  
  

Mime
View raw message