geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdam...@apache.org
Subject cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode LogicalCompression.java
Date Tue, 20 Jul 2004 00:26:04 GMT
gdamour     2004/07/19 17:26:04

  Modified:    sandbox/messaging/src/test/org/apache/geronimo/messaging
                        NodeImplTest.java EndPointUtil.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster
                        ClusterImpl.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging
                        NodeTopology.java NodeImpl.java
                        NodeEndPointView.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode
                        LogicalCompression.java
  Log:
  A primitive 2PC implementation of a topology reconfiguration.
  
  Revision  Changes    Path
  1.7       +4 -3      incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/NodeImplTest.java
  
  Index: NodeImplTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/NodeImplTest.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- NodeImplTest.java	17 Jul 2004 03:52:33 -0000	1.6
  +++ NodeImplTest.java	20 Jul 2004 00:26:03 -0000	1.7
  @@ -148,6 +148,7 @@
           NodeImplTest test = new NodeImplTest();
           test.setUp();
           test.testSendRawPerformance();
  +        test.tearDown();
       }
       
       public void testSendRawPerformance() throws Exception {
  @@ -307,7 +308,7 @@
               throw new IllegalArgumentException("Not expected");
           }
           public NodeInfo getNodeById(int anId) {
  -            if ( nodesInfo.length <= anId  ) {
  +            if ( nodesInfo.length < anId  ) {
                   throw new IllegalArgumentException("Not expected");
               }
               return nodesInfo[anId - 1];
  @@ -316,7 +317,7 @@
               throw new IllegalArgumentException("Not expected");
           }
           public int getVersion() {
  -            return 0;
  +            return 1;
           }
       }
       
  
  
  
  1.2       +9 -3      incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/EndPointUtil.java
  
  Index: EndPointUtil.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/EndPointUtil.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- EndPointUtil.java	24 May 2004 12:03:34 -0000	1.1
  +++ EndPointUtil.java	20 Jul 2004 00:26:03 -0000	1.2
  @@ -33,14 +33,20 @@
                   MsgHeaderConstants.SRC_NODE, "",
                   new HeaderOutInterceptor(
                       MsgHeaderConstants.SRC_ENDPOINT, "",
  -                    anEP1.getMsgConsumerOut()));
  +                    new HeaderOutInterceptor(
  +                        MsgHeaderConstants.TOPOLOGY_VERSION,
  +                        new Integer(1),
  +                        anEP1.getMsgConsumerOut())));
           anEP2.setMsgProducerOut(out);
           out =
               new HeaderOutInterceptor(
                   MsgHeaderConstants.SRC_NODE, "",
                   new HeaderOutInterceptor(
                       MsgHeaderConstants.SRC_ENDPOINT, "",
  -                    anEP2.getMsgConsumerOut()));
  +                    new HeaderOutInterceptor(
  +                        MsgHeaderConstants.TOPOLOGY_VERSION,
  +                        new Integer(1),
  +                        anEP2.getMsgConsumerOut())));
           anEP1.setMsgProducerOut(out);
       }
       
  
  
  
  1.3       +14 -3     incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterImpl.java
  
  Index: ClusterImpl.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterImpl.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ClusterImpl.java	17 Jul 2004 03:38:42 -0000	1.2
  +++ ClusterImpl.java	20 Jul 2004 00:26:03 -0000	1.3
  @@ -117,8 +117,15 @@
               }
               topologyManager.addNode(aNode);
               nodeTopology = topologyManager.factoryTopology();
  +            try {
  +                node.setTopology(nodeTopology);
  +            } catch (NodeException e) {
  +                // If the topology can not be applied, then one removes it
  +                // from the topologyManager.
  +                topologyManager.removeNode(aNode);
  +                throw e;
  +            }
           }
  -        node.setTopology(nodeTopology);
           fireClusterMemberEvent(
               new ClusterEvent(this, aNode, ClusterEvent.MEMBER_ADDED));
       }
  @@ -132,8 +139,12 @@
               }
               topologyManager.removeNode(aNode);
               nodeTopology = topologyManager.factoryTopology();
  +            try {
  +                node.setTopology(nodeTopology);
  +            } catch (NodeException e) {
  +                throw e;
  +            }
           }
  -        node.setTopology(nodeTopology);
           fireClusterMemberEvent(
               new ClusterEvent(this, aNode, ClusterEvent.MEMBER_REMOVED));
       }
  
  
  
  1.5       +5 -2      incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeTopology.java
  
  Index: NodeTopology.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeTopology.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- NodeTopology.java	5 Jul 2004 07:03:50 -0000	1.4
  +++ NodeTopology.java	20 Jul 2004 00:26:03 -0000	1.5
  @@ -32,6 +32,8 @@
   
       /**
        * Gets the version of this topology.
  +     * <BR>
  +     * 0 is a reserved value and must not be used.
        * 
        * @return version number.
        */
  @@ -42,7 +44,8 @@
        * reachable from aRoot. 
        * 
        * @param aRoot Node.
  -     * @return Set<NodeInfo> Neighbours.
  +     * @return Set<NodeInfo> Neighbours. An empty Set must be returned if
  +     * the specified node is not registered by this topology.
        */
       public Set getNeighbours(NodeInfo aRoot);
       
  
  
  
  1.9       +67 -25    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeImpl.java
  
  Index: NodeImpl.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeImpl.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- NodeImpl.java	17 Jul 2004 03:52:34 -0000	1.8
  +++ NodeImpl.java	20 Jul 2004 00:26:04 -0000	1.9
  @@ -31,11 +31,13 @@
   import org.apache.geronimo.messaging.interceptors.HeaderOutInterceptor;
   import org.apache.geronimo.messaging.interceptors.MsgOutDispatcher;
   import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
  +import org.apache.geronimo.messaging.interceptors.MsgTransformer;
   import org.apache.geronimo.messaging.interceptors.ThrowableTrapOutInterceptor;
   import org.apache.geronimo.messaging.io.IOContext;
   import org.apache.geronimo.messaging.io.ReplacerResolver;
   import org.apache.geronimo.messaging.io.StreamManager;
   import org.apache.geronimo.messaging.io.StreamManagerImpl;
  +import org.apache.geronimo.messaging.proxy.EndPointProxy;
   import org.apache.geronimo.messaging.proxy.EndPointProxyFactory;
   import org.apache.geronimo.messaging.proxy.EndPointProxyFactoryImpl;
   import org.apache.geronimo.messaging.proxy.EndPointProxyInfo;
  @@ -201,7 +203,7 @@
               processed.add(nodeInfo);
               prepareNodes(aTopology, processed, nodeInfo);
               
  -            endPointView.commitTopology(aTopology);
  +            endPointView.commitTopology();
               processed = new HashSet();
               processed.add(nodeInfo);
               commitNodes(aTopology, processed, nodeInfo);
  @@ -219,8 +221,8 @@
        * @param aProcessed Set<NodeInfo> nodes already processed.
        * @param aNodeInfo Node to be prepared.
        */
  -    private void prepareNodes(NodeTopology aTopology, Set aProcessed,
  -        NodeInfo aNodeInfo) {
  +    private void prepareNodes(final NodeTopology aTopology, Set aProcessed,
  +        NodeInfo aNodeInfo) throws NodeException {
           // Computes the neighbours which have not yet received the
           // topology reconfiguration.
           Set toBeProcessed = new HashSet(aTopology.getNeighbours(aNodeInfo));
  @@ -231,20 +233,42 @@
               return;
           }
   
  +        // Prepares the topology of all the neighbours.
           for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) {
               NodeInfo nodeInfo = (NodeInfo) iter.next();
               EndPointProxyInfo proxyInfo = new EndPointProxyInfo(
                   NodeEndPointView.NODE_ID, NodeEndPointView.class, nodeInfo);
               NodeEndPointView topologyEndPoint =
                   (NodeEndPointView) endPointProxyFactory.factory(proxyInfo);
  +            // sends the Msg in the prepared topology.
  +            ((EndPointProxy) topologyEndPoint).setTransformer(
  +                new MsgTransformer() {
  +                    private Msg msg;
  +                    public Msg pop() {
  +                        MsgHeader hd = msg.getHeader();
  +                        hd.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
  +                            new Integer(aTopology.getVersion()));
  +                        return msg;
  +                    }
  +                    public void push(Msg aMsg) {
  +                        msg = aMsg;
  +                    }
  +                }); 
               try {
                   topologyEndPoint.prepareTopology(aTopology);
  +            } catch (CommunicationException e) {
  +                throw new NodeException("Can not prepare topology", e);
               } finally {
                   endPointProxyFactory.releaseProxy(topologyEndPoint);
               }
               // Computes the nodes which have already received the topology
               // reconfiguration.
               aProcessed.add(nodeInfo);
  +        }
  +        
  +        // Prepares the topology of the neighbours of the direct neighbours.
  +        for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) {
  +            NodeInfo nodeInfo = (NodeInfo) iter.next();
               prepareNodes(aTopology, aProcessed, nodeInfo);
           }
       }
  @@ -256,8 +280,8 @@
        * @param aProcessed Set<NodeInfo> nodes already processed.
        * @param aNodeInfo Node to be committed.
        */
  -    private void commitNodes(NodeTopology aTopology, Set aProcessed,
  -        NodeInfo aNodeInfo) {
  +    private void commitNodes(final NodeTopology aTopology, Set aProcessed,
  +        NodeInfo aNodeInfo) throws NodeException {
           Set toBeProcessed = new HashSet(aTopology.getNeighbours(aNodeInfo));
           toBeProcessed.removeAll(aProcessed);
   
  @@ -271,12 +295,32 @@
                   NodeEndPointView.NODE_ID, NodeEndPointView.class, nodeInfo);
               NodeEndPointView topologyEndPoint =
                   (NodeEndPointView) endPointProxyFactory.factory(proxyInfo);
  +            // sends the Msg in the prepared topology.
  +            ((EndPointProxy) topologyEndPoint).setTransformer(
  +                new MsgTransformer() {
  +                    private Msg msg;
  +                    public Msg pop() {
  +                        MsgHeader hd = msg.getHeader();
  +                        hd.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
  +                            new Integer(aTopology.getVersion()));
  +                        return msg;
  +                    }
  +                    public void push(Msg aMsg) {
  +                        msg = aMsg;
  +                    }
  +                }); 
               try {
  -                topologyEndPoint.commitTopology(aTopology);
  +                topologyEndPoint.commitTopology();
  +            } catch (CommunicationException e) {
  +                throw new NodeException("Can not commit topology", e);
               } finally {
                   endPointProxyFactory.releaseProxy(topologyEndPoint);
               }
               aProcessed.add(nodeInfo);
  +        }
  +        
  +        for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) {
  +            NodeInfo nodeInfo = (NodeInfo) iter.next();
               commitNodes(aTopology, aProcessed, nodeInfo);
           }
       }
  @@ -316,7 +360,7 @@
                   return Collections.EMPTY_SET;
               }
               public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
  -                throw new UnsupportedOperationException("getPath");
  +                return null;
               }
               public int getIDOfNode(NodeInfo aNodeInfo) {
                   throw new UnsupportedOperationException("getIDOfNode");
  @@ -332,8 +376,8 @@
               public int getVersion() {
                   return 0;
               }
  -            public void setVersion(int aVersion) {
  -                throw new UnsupportedOperationException("setVersion");
  +            public String toString() {
  +                return "Stand-alone node " + nodeInfo;
               }
           };
           setTopology(topology);
  @@ -465,10 +509,6 @@
   
           public void push(Msg aMsg, Throwable aThrowable) {
               log.error("Can not deliver " + aMsg, aThrowable);
  -            // Send the Msg back to the caller and provide the exception.
  -            Msg msg = aMsg.reply();
  -            msg.getBody().setContent(new Result(false, aThrowable));
  -            nodeManager.getMsgConsumerOut().push(aMsg);
           }
           
       }
  @@ -498,22 +538,24 @@
   
       private class NodeEndPointViewImpl extends BaseEndPoint
           implements NodeEndPointView {
  +        private NodeTopology topology;
           private NodeEndPointViewImpl() {
               super(NodeImpl.this, NODE_ID);
           }
  -        public void prepareTopology(NodeTopology aTopology) {
  +        public void prepareTopology(NodeTopology aTopology)
  +            throws NodeException {
               // Registers a future topology here. This way neighbours can start
               // to send Msgs compressed with the new topology.
  -            // TODO re-enable compression.
  -//            compression.registerFutureTopology(aTopology);
  -            nodeManager.setTopology(aTopology);
  -        }
  -        public void commitTopology(NodeTopology aTopology) {
  -            log.info("********** Topology update **********\n" + aTopology);
  -            nodeTopology = aTopology;
  -            log.info("********** End Topology update ******");
  -            // TODO re-enable compression.
  -//            compression.registerTopology(aTopology);
  +            compression.prepareTopology(aTopology);
  +            nodeManager.prepareTopology(aTopology);
  +            topology = aTopology;
  +        }
  +        public void commitTopology() {
  +            nodeManager.commitTopology();
  +            compression.commitTopology();
  +            nodeTopology = topology;
  +            log.info("\n********** Topology update **********\n" + 
  +                topology + "\n********** End Topology update ******");
           }
       }
       
  
  
  
  1.3       +5 -6      incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeEndPointView.java
  
  Index: NodeEndPointView.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeEndPointView.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- NodeEndPointView.java	17 Jul 2004 03:52:34 -0000	1.2
  +++ NodeEndPointView.java	20 Jul 2004 00:26:04 -0000	1.3
  @@ -43,16 +43,15 @@
        * specified topology.
        * 
        * @param aTopology Topology to be prepared.
  +     * @exception NodeException If the node can not join all of its neighbours.
        */
  -    public void prepareTopology(NodeTopology aTopology);
  +    public void prepareTopology(NodeTopology aTopology) throws NodeException;
       
       /**
  -     * Commits the specified topology.
  -     * <BR>
  -     * This latter must have been prepared just before this call.
  +     * Commits the topology previously prepared.
        * 
        * @param aTopology Topology to be committed.
        */
  -    public void commitTopology(NodeTopology aTopology);
  +    public void commitTopology();
       
   }
  
  
  
  1.4       +36 -27    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java
  
  Index: LogicalCompression.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- LogicalCompression.java	5 Jul 2004 07:03:50 -0000	1.3
  +++ LogicalCompression.java	20 Jul 2004 00:26:04 -0000	1.4
  @@ -46,14 +46,14 @@
   {
   
       /**
  -     * Future topology.
  +     * Topology being prepared.
        */
  -    private NodeTopology futTopology;
  +    private volatile NodeTopology preparedTopology;
       
       /**
  -     * Current registered topology.
  +     * Committed topology.
        */
  -    private NodeTopology curTopology;
  +    private volatile NodeTopology topology;
       
       /**
        * No logical compression.
  @@ -77,23 +77,24 @@
   
       /**
        * Registers a future topology. It is only used to uncompress Msgs which
  -     * have not been compressed by the current topology. 
  +     * have not been compressed by the topology currently committed. 
        * 
  -     * @param aTopology Future topology.
  +     * @param aTopology Topology.
        */
  -    public void registerFutureTopology(NodeTopology aTopology) {
  -        futTopology = aTopology;
  +    public void prepareTopology(NodeTopology aTopology) {
  +        preparedTopology = aTopology;
       }
       
       /**
  -     * Registers the current topology. It is used to compress and uncompress
  -     * Msgs. If it is not possible to uncompress a Msg with the current
  -     * topology, then the future topology is used.
  +     * Commits the previousy prepared topology. It is used to compress and 
  +     * uncompress Msgs. If it is not possible to uncompress a Msg with the 
  +     * current topology, then the future topology is used.
        * 
        * @param aTopology Current topology.
        */
  -    public void registerTopology(NodeTopology aTopology) {
  -        curTopology = aTopology;
  +    public void commitTopology() {
  +        topology = preparedTopology;
  +        preparedTopology = null;
       }
       
       public Object beforePop(StreamInputStream anIn)
  @@ -112,17 +113,8 @@
               return result;
           }
           int version = anIn.readInt();
  -        NodeTopology topology = null;
  -        if ( null != curTopology && version == curTopology.getVersion() ) {
  -            topology = curTopology;
  -        } else if ( null != futTopology &&
  -            version == futTopology.getVersion() ) {
  -            topology = futTopology;
  -        }
  -        if ( null == topology ) {
  -            throw new IllegalArgumentException("No topology with version {" +
  -                version + "} is defined.");
  -        }
  +        NodeTopology topology = getTopology(version);
  +
           int id = anIn.readInt();
           NodeInfo nodeInfo = topology.getNodeById(id);
           result.add(nodeInfo);
  @@ -164,8 +156,11 @@
           RequestSender.RequestID reqID  = (RequestSender.RequestID)
               header.resetHeader(MsgHeaderConstants.CORRELATION_ID);
           anOut.writeByte(reqID.getID());
  -        NodeTopology topology = curTopology;
  -        if ( null == topology ) {
  +        Integer version = (Integer)
  +            header.getHeader(MsgHeaderConstants.TOPOLOGY_VERSION);
  +        NodeTopology topology = getTopology(version.intValue());
  +        // Uses only the current topology to compress the data.
  +        if ( null == topology || preparedTopology == topology ) {
               anOut.writeByte(NULL);
               return null;
           }
  @@ -190,4 +185,18 @@
           Object anOpaque) throws IOException {
       }
   
  +    private NodeTopology getTopology(int aVersion) {
  +        if ( 0 == aVersion || null == topology ) {
  +            return null;
  +        } else if ( aVersion == topology.getVersion() ) {
  +            return topology;
  +        } else if ( null == preparedTopology ||
  +            aVersion == preparedTopology.getVersion() ) {
  +            return preparedTopology;
  +        } else {
  +            throw new IllegalArgumentException("Topology version " + 
  +                aVersion + " is too old.");
  +        }
  +    }
  +    
   }
  
  
  

Mime
View raw message