geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdam...@apache.org
Subject cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging NodeImpl.java NodeEndPointView.java Node.java
Date Sat, 17 Jul 2004 03:52:34 GMT
gdamour     2004/07/16 20:52:34

  Modified:    sandbox/messaging/src/test/org/apache/geronimo/messaging
                        NodeImplTest.java MockNode.java
                        MockEndPointImpl.java
               sandbox/messaging/src/java/org/apache/geronimo/messaging
                        NodeImpl.java NodeEndPointView.java Node.java
  Log:
  Change the way a topology is cascaded. The previous implementation was not dealing correctly
with
  cyclic topologies.
  
  Revision  Changes    Path
  1.6       +52 -49    incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/NodeImplTest.java
  
  Index: NodeImplTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/NodeImplTest.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- NodeImplTest.java	8 Jul 2004 05:13:29 -0000	1.5
  +++ NodeImplTest.java	17 Jul 2004 03:52:33 -0000	1.6
  @@ -20,6 +20,8 @@
   import java.io.IOException;
   import java.io.InputStream;
   import java.net.InetAddress;
  +import java.util.ArrayList;
  +import java.util.Collections;
   import java.util.HashSet;
   import java.util.List;
   import java.util.Set;
  @@ -110,7 +112,7 @@
           // Sets the topology.
           NodeTopology topology =
               new MockTopology(nodeInfo1, nodeInfo2, nodeInfo3, nodeInfo4);
  -        node1.setTopology(topology);
  +        node3.setTopology(topology);
       }
   
       protected void tearDown() throws Exception {
  @@ -208,8 +210,7 @@
           public void init(String aName) throws Exception {
               tp = new ThreadPool();
               tp.setKeepAliveTime(1 * 1000);
  -            tp.setMinimumPoolSize(5);
  -            tp.setMaximumPoolSize(25);
  +            tp.setPoolSize(10);
               tp.setPoolName("TP " + aName);
   
               cp = new ClockPool();
  @@ -234,6 +235,7 @@
    
       private static class MockTopology implements NodeTopology {
   
  +        private final NodeInfo[] nodesInfo;
           private final NodeInfo nodeInfo1;
           private final NodeInfo nodeInfo2;
           private final NodeInfo nodeInfo3;
  @@ -247,67 +249,68 @@
               nodeInfo2 = aNodeInfo2;
               nodeInfo3 = aNodeInfo3;
               nodeInfo4 = aNodeInfo4;
  +            nodesInfo = new NodeInfo[] {
  +                nodeInfo1, nodeInfo2, nodeInfo3, nodeInfo4};
           }
           
           public Set getNeighbours(NodeInfo aRoot) {
               Set result = new HashSet();
  -            if ( aRoot.equals(nodeInfo1) ) {
  -                result.add(nodeInfo2);
  -            } else if ( aRoot.equals(nodeInfo2) ) {
  -                result.add(nodeInfo3);
  -            } else if ( aRoot.equals(nodeInfo3) ) {
  -                result.add(nodeInfo4);
  -            } else if ( aRoot.equals(nodeInfo4) ) {
  -            } else {
  -                throw new IllegalArgumentException("Not expected");
  +            if ( aRoot.equals(nodesInfo[0]) ) {
  +                result.add(nodesInfo[1]);
  +                return result;
  +            } else if ( aRoot.equals(nodesInfo[nodesInfo.length - 1]) ) {
  +                result.add(nodesInfo[nodesInfo.length - 2]);
  +                return result;
  +            }
  +            for (int i = 1; i < nodesInfo.length - 1; i++) {
  +                if ( nodesInfo[i].equals(aRoot) ) {
  +                    result.add(nodesInfo[i - 1]);
  +                    result.add(nodesInfo[i + 1]);
  +                    return result;
  +                }
               }
  -            return result;
  +            throw new IllegalArgumentException("Not expected");
           }
           public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
  -            if ( aSource.equals(nodeInfo1) && aTarget.equals(nodeInfo2) ) {
  -                return new NodeInfo[] {nodeInfo2};
  -            } else if ( aSource.equals(nodeInfo2) && aTarget.equals(nodeInfo1)
) {
  -                return new NodeInfo[] {nodeInfo1};
  -            } else if ( aSource.equals(nodeInfo2) && aTarget.equals(nodeInfo3)
) {
  -                return new NodeInfo[] {nodeInfo3};
  -            } else if ( aSource.equals(nodeInfo3) && aTarget.equals(nodeInfo2)
) {
  -                return new NodeInfo[] {nodeInfo2};
  -            } else if ( aSource.equals(nodeInfo3) && aTarget.equals(nodeInfo4)
) {
  -                return new NodeInfo[] {nodeInfo4};
  -            } else if ( aSource.equals(nodeInfo4) && aTarget.equals(nodeInfo3)
) {
  -                return new NodeInfo[] {nodeInfo3};
  -            } else if ( aSource.equals(nodeInfo1) && aTarget.equals(nodeInfo4)
) {
  -                return new NodeInfo[] {nodeInfo2, nodeInfo3, nodeInfo4};
  -            } else if ( aSource.equals(nodeInfo4) && aTarget.equals(nodeInfo1)
) {
  -                return new NodeInfo[] {nodeInfo3, nodeInfo2, nodeInfo1};
  +            boolean isInside = false;
  +            List result = new ArrayList();
  +            for (int i = 0; i < nodesInfo.length ; i++) {
  +                NodeInfo curNode = nodesInfo[i];
  +                if ( curNode.equals(aSource) ) {
  +                    if ( isInside ) {
  +                        Collections.reverse(result);
  +                        return (NodeInfo[]) result.toArray(new NodeInfo[0]);
  +                    }
  +                    isInside = true;
  +                    continue;
  +                } else if ( curNode.equals(aTarget) ) {
  +                    if ( isInside ) {
  +                        result.add(curNode);
  +                        return (NodeInfo[]) result.toArray(new NodeInfo[0]);
  +                    }
  +                    isInside = true;
  +                    result.add(curNode);
  +                    continue;
  +                }
  +                if ( isInside ) {
  +                    result.add(nodesInfo[i]);
  +                }
               }
               throw new IllegalArgumentException("Not expected");
           }
           public int getIDOfNode(NodeInfo aNodeInfo) {
  -            if ( aNodeInfo.equals(nodeInfo1) ) {
  -                return 1;
  -            } else if ( aNodeInfo.equals(nodeInfo2) ) {
  -                return 2;
  -            } else if ( aNodeInfo.equals(nodeInfo3) ) {
  -                return 3;
  -            } else if ( aNodeInfo.equals(nodeInfo4) ) {
  -                return 4;
  +            for (int i = 0; i < nodesInfo.length; i++) {
  +                if ( nodesInfo[i].equals(aNodeInfo) ) {
  +                    return i + 1;
  +                }
               }
               throw new IllegalArgumentException("Not expected");
           }
           public NodeInfo getNodeById(int anId) {
  -            switch (anId) {
  -                case 1:
  -                    return nodeInfo1;
  -                case 2:
  -                    return nodeInfo2;
  -                case 3:
  -                    return nodeInfo3;
  -                case 4:
  -                    return nodeInfo4;
  -                default:
  -                    throw new IllegalArgumentException("Not expected");
  +            if ( nodesInfo.length <= anId  ) {
  +                throw new IllegalArgumentException("Not expected");
               }
  +            return nodesInfo[anId - 1];
           }
           public Set getNodes() {
               throw new IllegalArgumentException("Not expected");
  
  
  
  1.5       +1 -2      incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/MockNode.java
  
  Index: MockNode.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/MockNode.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- MockNode.java	10 Jun 2004 23:12:24 -0000	1.4
  +++ MockNode.java	17 Jul 2004 03:52:33 -0000	1.5
  @@ -23,7 +23,6 @@
   import java.util.Set;
   
   import org.apache.geronimo.gbean.WaitingException;
  -import org.apache.geronimo.messaging.cluster.topology.TopologyManager;
   import org.apache.geronimo.messaging.io.NullReplacerResolver;
   import org.apache.geronimo.messaging.io.ReplacerResolver;
   import org.apache.geronimo.messaging.proxy.EndPointProxyInfo;
  
  
  
  1.4       +4 -4      incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/MockEndPointImpl.java
  
  Index: MockEndPointImpl.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/MockEndPointImpl.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MockEndPointImpl.java	10 Jun 2004 23:12:24 -0000	1.3
  +++ MockEndPointImpl.java	17 Jul 2004 03:52:33 -0000	1.4
  @@ -69,9 +69,9 @@
       static {
           GBeanInfoFactory factory = new GBeanInfoFactory(MockEndPointImpl.class, GBeanBaseEndPoint.GBEAN_INFO);
           factory.setConstructor(
  -            new String[] {"Node", "ID", "TargetNodes"});
  -        factory.addAttribute("TargetNodes", NodeInfo[].class, true);
  -        factory.addAttribute("Received", List.class, false);
  +            new String[] {"Node", "ID", "targetNodes"});
  +        factory.addAttribute("targetNodes", NodeInfo[].class, true);
  +        factory.addAttribute("received", List.class, false);
           factory.addOperation("sendRawObject", new Class[]{Object.class});
           GBEAN_INFO = factory.getBeanInfo();
       }
  
  
  
  1.8       +114 -53   incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeImpl.java
  
  Index: NodeImpl.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeImpl.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- NodeImpl.java	8 Jul 2004 05:13:29 -0000	1.7
  +++ NodeImpl.java	17 Jul 2004 03:52:34 -0000	1.8
  @@ -19,6 +19,7 @@
   
   import java.util.Collections;
   import java.util.HashSet;
  +import java.util.Iterator;
   import java.util.Set;
   
   import org.apache.commons.logging.Log;
  @@ -50,6 +51,8 @@
   import org.apache.geronimo.pool.ClockPool;
   import org.apache.geronimo.pool.ThreadPool;
   
  +import EDU.oswego.cs.dl.util.concurrent.Semaphore;
  +
   /**
    * Node implementation.
    *
  @@ -114,6 +117,11 @@
       private final ClockPool clockPool;
   
       /**
  +     * EndPoint view of this node.
  +     */
  +    private final NodeEndPointView endPointView;
  +    
  +    /**
        * NodeTopology within which this node is operating.
        */
       private NodeTopology nodeTopology;
  @@ -121,7 +129,7 @@
       /**
        * To serialize the topology changes.
        */
  -    private final Object topologyMonitor;
  +    private final Semaphore topologyMonitor;
       
       /**
        * Creates a Node.
  @@ -147,8 +155,9 @@
           clockPool = aClockPool;
           
           replacerResolver = new MsgReplacerResolver();
  -        topologyMonitor = new Object();
  -
  +        topologyMonitor = new Semaphore(1);
  +        endPointView = new NodeEndPointViewImpl();
  +        
           streamManager = newStreamManager();
           referenceableManager = newReferenceableManager();
           endPointProxyFactory = newEndPointProxyFactory();
  @@ -171,7 +180,7 @@
           addEndPoint(endPointProxyFactory);
           addEndPoint(referenceableManager);
           addEndPoint(streamManager);
  -        addEndPoint(new NodeEndPointViewImpl());
  +        addEndPoint(endPointView);
       }
   
       public ReplacerResolver getReplacerResolver() {
  @@ -182,53 +191,94 @@
           return nodeInfo;
       }
       
  -    public void setTopology(NodeTopology aTopology) {
  -        synchronized(topologyMonitor) {
  -            cascadeTopology(aTopology, Collections.EMPTY_SET);
  +    public void setTopology(NodeTopology aTopology) throws NodeException {
  +        try {
  +            topologyMonitor.attempt(1000);
  +
  +            endPointView.prepareTopology(aTopology);
  +
  +            Set processed = new HashSet();
  +            processed.add(nodeInfo);
  +            prepareNodes(aTopology, processed, nodeInfo);
  +            
  +            endPointView.commitTopology(aTopology);
  +            processed = new HashSet();
  +            processed.add(nodeInfo);
  +            commitNodes(aTopology, processed, nodeInfo);
  +        } catch (InterruptedException e) {
  +            throw new NodeException("Topology already being applied.");
  +        } finally  {
  +            topologyMonitor.release();
           }
       }
  -    
  -    private void cascadeTopology(NodeTopology aTopology, Set aSetOfProcessed) {
  -        // Registers a future topology here. This way neighbours can start to
  -        // send Msgs compressed with the new topology.
  -        compression.registerFutureTopology(aTopology);
  -        
  -        // Applies the new topology.
  -        nodeManager.setTopology(aTopology);
   
  -        // Computes the neighbours which have not yet received the topology
  -        // reconfiguration.
  -        Set neighbours = new HashSet(aTopology.getNeighbours(nodeInfo));
  -        neighbours.removeAll(aSetOfProcessed);
  -        
  -        // Computes the nodes which have already received the topology
  -        // reconfiguration.
  -        Set processed = new HashSet(aSetOfProcessed);
  -        processed.add(nodeInfo);
  -        processed.addAll(neighbours);
  +    /**
  +     * Prepares the topology of the node aNodeInfo.
  +     * 
  +     * @param aTopology Topology to be prepared.
  +     * @param aProcessed Set<NodeInfo> nodes already processed.
  +     * @param aNodeInfo Node to be prepared.
  +     */
  +    private void prepareNodes(NodeTopology aTopology, Set aProcessed,
  +        NodeInfo aNodeInfo) {
  +        // Computes the neighbours which have not yet received the
  +        // topology reconfiguration.
  +        Set toBeProcessed = new HashSet(aTopology.getNeighbours(aNodeInfo));
  +        toBeProcessed.removeAll(aProcessed);
   
  -        NodeInfo[] targets = (NodeInfo[]) neighbours.toArray(new NodeInfo[0]);
           // No more nodes to process.
  -        if ( 0 == targets.length ) {
  +        if (0 == toBeProcessed.size()) {
               return;
           }
  -      
  -        // Acquires a proxy on the NodeEndPointViews of all the neighbours,
  -        // which have not yet been processed.
  -        EndPointProxyInfo proxyInfo =
  -            new EndPointProxyInfo(NodeEndPointView.NODE_ID,
  -                new Class[] {NodeEndPointView.class}, targets);
  -        NodeEndPointView topologyEndPoint = (NodeEndPointView)
  -            endPointProxyFactory.factory(proxyInfo);
  -        try {
  -            // Cascades the new topology to all of them.
  -            topologyEndPoint.cascadeTopology(aTopology, processed);
  -        } finally {
  -            endPointProxyFactory.releaseProxy(topologyEndPoint);
  +
  +        for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) {
  +            NodeInfo nodeInfo = (NodeInfo) iter.next();
  +            EndPointProxyInfo proxyInfo = new EndPointProxyInfo(
  +                NodeEndPointView.NODE_ID, NodeEndPointView.class, nodeInfo);
  +            NodeEndPointView topologyEndPoint =
  +                (NodeEndPointView) endPointProxyFactory.factory(proxyInfo);
  +            try {
  +                topologyEndPoint.prepareTopology(aTopology);
  +            } finally {
  +                endPointProxyFactory.releaseProxy(topologyEndPoint);
  +            }
  +            // Computes the nodes which have already received the topology
  +            // reconfiguration.
  +            aProcessed.add(nodeInfo);
  +            prepareNodes(aTopology, aProcessed, nodeInfo);
  +        }
  +    }
  +    
  +    /**
  +     * Commits the topology of the node aNodeInfo.
  +     * 
  +     * @param aTopology Topology to be committed.
  +     * @param aProcessed Set<NodeInfo> nodes already processed.
  +     * @param aNodeInfo Node to be committed.
  +     */
  +    private void commitNodes(NodeTopology aTopology, Set aProcessed,
  +        NodeInfo aNodeInfo) {
  +        Set toBeProcessed = new HashSet(aTopology.getNeighbours(aNodeInfo));
  +        toBeProcessed.removeAll(aProcessed);
  +
  +        if (0 == toBeProcessed.size()) {
  +            return;
  +        }
  +
  +        for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) {
  +            NodeInfo nodeInfo = (NodeInfo) iter.next();
  +            EndPointProxyInfo proxyInfo = new EndPointProxyInfo(
  +                NodeEndPointView.NODE_ID, NodeEndPointView.class, nodeInfo);
  +            NodeEndPointView topologyEndPoint =
  +                (NodeEndPointView) endPointProxyFactory.factory(proxyInfo);
  +            try {
  +                topologyEndPoint.commitTopology(aTopology);
  +            } finally {
  +                endPointProxyFactory.releaseProxy(topologyEndPoint);
  +            }
  +            aProcessed.add(nodeInfo);
  +            commitNodes(aTopology, aProcessed, nodeInfo);
           }
  -        
  -        compression.registerTopology(aTopology);
  -        nodeTopology = aTopology;
       }
       
       public NodeTopology getTopology() {
  @@ -266,13 +316,13 @@
                   return Collections.EMPTY_SET;
               }
               public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
  -                throw new UnsupportedOperationException();
  +                throw new UnsupportedOperationException("getPath");
               }
               public int getIDOfNode(NodeInfo aNodeInfo) {
  -                throw new UnsupportedOperationException();
  +                throw new UnsupportedOperationException("getIDOfNode");
               }
               public NodeInfo getNodeById(int anId) {
  -                throw new UnsupportedOperationException();
  +                throw new UnsupportedOperationException("getNodeById");
               }
               public Set getNodes() {
                   Set result = new HashSet();
  @@ -283,7 +333,7 @@
                   return 0;
               }
               public void setVersion(int aVersion) {
  -                throw new UnsupportedOperationException();
  +                throw new UnsupportedOperationException("setVersion");
               }
           };
           setTopology(topology);
  @@ -373,7 +423,7 @@
           }
           public void push(final Msg aMsg) {
               try {
  -                threadPool.getWorkManager().execute(new Runnable() {
  +                threadPool.execute(new Runnable() {
                       public void run() {
                           out.push(aMsg);
                       }
  @@ -451,8 +501,19 @@
           private NodeEndPointViewImpl() {
               super(NodeImpl.this, NODE_ID);
           }
  -        public void cascadeTopology(NodeTopology aTopology, Set aSetOfProcessed) {
  -            NodeImpl.this.cascadeTopology(aTopology, aSetOfProcessed);
  +        public void prepareTopology(NodeTopology aTopology) {
  +            // Registers a future topology here. This way neighbours can start
  +            // to send Msgs compressed with the new topology.
  +            // TODO re-enable compression.
  +//            compression.registerFutureTopology(aTopology);
  +            nodeManager.setTopology(aTopology);
  +        }
  +        public void commitTopology(NodeTopology aTopology) {
  +            log.info("********** Topology update **********\n" + aTopology);
  +            nodeTopology = aTopology;
  +            log.info("********** End Topology update ******");
  +            // TODO re-enable compression.
  +//            compression.registerTopology(aTopology);
           }
       }
       
  @@ -460,9 +521,9 @@
   
       static {
           GBeanInfoFactory factory = new GBeanInfoFactory(NodeImpl.class);
  -        factory.setConstructor(new String[] {"NodeInfo", "ThreadPool",
  +        factory.setConstructor(new String[] {"nodeInfo", "ThreadPool",
               "ClockPool", "MessagingTransportFactory"});
  -        factory.addInterface(Node.class, new String[] {"NodeInfo"});
  +        factory.addInterface(Node.class, new String[] {"nodeInfo"});
           factory.addReference("ThreadPool", ThreadPool.class);
           factory.addReference("ClockPool", ClockPool.class);
           factory.addReference("MessagingTransportFactory",
  
  
  
  1.2       +16 -11    incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeEndPointView.java
  
  Index: NodeEndPointView.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeEndPointView.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- NodeEndPointView.java	10 Jun 2004 23:12:24 -0000	1.1
  +++ NodeEndPointView.java	17 Jul 2004 03:52:34 -0000	1.2
  @@ -17,8 +17,6 @@
   
   package org.apache.geronimo.messaging;
   
  -import java.util.Set;
  -
   /**
    * When a Node is created, it registers itself as an EndPoint defining the
    * following contracts.
  @@ -39,15 +37,22 @@
       public static final Object NODE_ID = "Node";
   
       /**
  -     * Cascades the specified topology to all the neighbours of the underlying
  -     * Node. The topology change should not be cascaded to the Nodes contained
  -     * by aSetOfProcessed. 
  +     * Prepares the specified topology.
  +     * <BR>
  +     * The node must validate (join) all of its neighbours defined by the
  +     * specified topology.
  +     * 
  +     * @param aTopology Topology to be prepared.
  +     */
  +    public void prepareTopology(NodeTopology aTopology);
  +    
  +    /**
  +     * Commits the specified topology.
  +     * <BR>
  +     * This latter must have been prepared just before this call.
        * 
  -     * @param aTopology Topology to be cascaded to all the neighbours of the
  -     * underlying Node and then applied on the underlying node. 
  -     * @param aSetOfProcessed Set<NodeInfo> Nodes which have already received
  -     * the topology change request.
  +     * @param aTopology Topology to be committed.
        */
  -    public void cascadeTopology(NodeTopology aTopology, Set aSetOfProcessed);
  +    public void commitTopology(NodeTopology aTopology);
       
   }
  
  
  
  1.5       +4 -2      incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/Node.java
  
  Index: Node.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/Node.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- Node.java	10 Jun 2004 23:12:24 -0000	1.4
  +++ Node.java	17 Jul 2004 03:52:34 -0000	1.5
  @@ -55,8 +55,10 @@
        * locally.
        * 
        * @param aTopology Topology of the nodes constituting the network layout.
  +     * 
  +     * @exception NodeException Indicates that the topology can not be set.
        */
  -    public void setTopology(NodeTopology aTopology);
  +    public void setTopology(NodeTopology aTopology) throws NodeException;
   
       /**
        * Gets the node topology in which this instance is operating.
  
  
  

Mime
View raw message