Return-Path: Delivered-To: apmail-geronimo-scm-archive@www.apache.org Received: (qmail 88300 invoked from network); 20 Jul 2004 00:26:07 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur-2.apache.org with SMTP; 20 Jul 2004 00:26:07 -0000 Received: (qmail 48540 invoked by uid 500); 20 Jul 2004 00:26:06 -0000 Delivered-To: apmail-geronimo-scm-archive@geronimo.apache.org Received: (qmail 48353 invoked by uid 500); 20 Jul 2004 00:26:05 -0000 Mailing-List: contact scm-help@geronimo.apache.org; run by ezmlm Precedence: bulk list-help: list-unsubscribe: list-post: Reply-To: dev@geronimo.apache.org Delivered-To: mailing list scm@geronimo.apache.org Received: (qmail 48339 invoked by uid 500); 20 Jul 2004 00:26:05 -0000 Delivered-To: apmail-incubator-geronimo-cvs@apache.org Received: (qmail 48336 invoked by uid 99); 20 Jul 2004 00:26:05 -0000 X-ASF-Spam-Status: No, hits=0.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.27.1) with SMTP; Mon, 19 Jul 2004 17:26:04 -0700 Received: (qmail 88279 invoked by uid 1782); 20 Jul 2004 00:26:04 -0000 Date: 20 Jul 2004 00:26:04 -0000 Message-ID: <20040720002604.88278.qmail@minotaur.apache.org> From: gdamour@apache.org To: incubator-geronimo-cvs@apache.org Subject: cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode LogicalCompression.java X-Virus-Checked: Checked X-Spam-Rating: minotaur-2.apache.org 1.6.2 0/1000/N gdamour 2004/07/19 17:26:04 Modified: sandbox/messaging/src/test/org/apache/geronimo/messaging NodeImplTest.java EndPointUtil.java sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster ClusterImpl.java sandbox/messaging/src/java/org/apache/geronimo/messaging NodeTopology.java NodeImpl.java NodeEndPointView.java sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode LogicalCompression.java Log: A primitive 2PC implementation of a topology reconfiguration. Revision Changes Path 1.7 +4 -3 incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/NodeImplTest.java Index: NodeImplTest.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/NodeImplTest.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- NodeImplTest.java 17 Jul 2004 03:52:33 -0000 1.6 +++ NodeImplTest.java 20 Jul 2004 00:26:03 -0000 1.7 @@ -148,6 +148,7 @@ NodeImplTest test = new NodeImplTest(); test.setUp(); test.testSendRawPerformance(); + test.tearDown(); } public void testSendRawPerformance() throws Exception { @@ -307,7 +308,7 @@ throw new IllegalArgumentException("Not expected"); } public NodeInfo getNodeById(int anId) { - if ( nodesInfo.length <= anId ) { + if ( nodesInfo.length < anId ) { throw new IllegalArgumentException("Not expected"); } return nodesInfo[anId - 1]; @@ -316,7 +317,7 @@ throw new IllegalArgumentException("Not expected"); } public int getVersion() { - return 0; + return 1; } } 1.2 +9 -3 incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/EndPointUtil.java Index: EndPointUtil.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/EndPointUtil.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- EndPointUtil.java 24 May 2004 12:03:34 -0000 1.1 +++ EndPointUtil.java 20 Jul 2004 00:26:03 -0000 1.2 @@ -33,14 +33,20 @@ MsgHeaderConstants.SRC_NODE, "", new HeaderOutInterceptor( MsgHeaderConstants.SRC_ENDPOINT, "", - anEP1.getMsgConsumerOut())); + new HeaderOutInterceptor( + MsgHeaderConstants.TOPOLOGY_VERSION, + new Integer(1), + anEP1.getMsgConsumerOut()))); anEP2.setMsgProducerOut(out); out = new HeaderOutInterceptor( MsgHeaderConstants.SRC_NODE, "", new HeaderOutInterceptor( MsgHeaderConstants.SRC_ENDPOINT, "", - anEP2.getMsgConsumerOut())); + new HeaderOutInterceptor( + MsgHeaderConstants.TOPOLOGY_VERSION, + new Integer(1), + anEP2.getMsgConsumerOut()))); anEP1.setMsgProducerOut(out); } 1.3 +14 -3 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterImpl.java Index: ClusterImpl.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/cluster/ClusterImpl.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- ClusterImpl.java 17 Jul 2004 03:38:42 -0000 1.2 +++ ClusterImpl.java 20 Jul 2004 00:26:03 -0000 1.3 @@ -117,8 +117,15 @@ } topologyManager.addNode(aNode); nodeTopology = topologyManager.factoryTopology(); + try { + node.setTopology(nodeTopology); + } catch (NodeException e) { + // If the topology can not be applied, then one removes it + // from the topologyManager. + topologyManager.removeNode(aNode); + throw e; + } } - node.setTopology(nodeTopology); fireClusterMemberEvent( new ClusterEvent(this, aNode, ClusterEvent.MEMBER_ADDED)); } @@ -132,8 +139,12 @@ } topologyManager.removeNode(aNode); nodeTopology = topologyManager.factoryTopology(); + try { + node.setTopology(nodeTopology); + } catch (NodeException e) { + throw e; + } } - node.setTopology(nodeTopology); fireClusterMemberEvent( new ClusterEvent(this, aNode, ClusterEvent.MEMBER_REMOVED)); } 1.5 +5 -2 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeTopology.java Index: NodeTopology.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeTopology.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- NodeTopology.java 5 Jul 2004 07:03:50 -0000 1.4 +++ NodeTopology.java 20 Jul 2004 00:26:03 -0000 1.5 @@ -32,6 +32,8 @@ /** * Gets the version of this topology. + *
+ * 0 is a reserved value and must not be used. * * @return version number. */ @@ -42,7 +44,8 @@ * reachable from aRoot. * * @param aRoot Node. - * @return Set Neighbours. + * @return Set Neighbours. An empty Set must be returned if + * the specified node is not registered by this topology. */ public Set getNeighbours(NodeInfo aRoot); 1.9 +67 -25 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeImpl.java Index: NodeImpl.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeImpl.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- NodeImpl.java 17 Jul 2004 03:52:34 -0000 1.8 +++ NodeImpl.java 20 Jul 2004 00:26:04 -0000 1.9 @@ -31,11 +31,13 @@ import org.apache.geronimo.messaging.interceptors.HeaderOutInterceptor; import org.apache.geronimo.messaging.interceptors.MsgOutDispatcher; import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor; +import org.apache.geronimo.messaging.interceptors.MsgTransformer; import org.apache.geronimo.messaging.interceptors.ThrowableTrapOutInterceptor; import org.apache.geronimo.messaging.io.IOContext; import org.apache.geronimo.messaging.io.ReplacerResolver; import org.apache.geronimo.messaging.io.StreamManager; import org.apache.geronimo.messaging.io.StreamManagerImpl; +import org.apache.geronimo.messaging.proxy.EndPointProxy; import org.apache.geronimo.messaging.proxy.EndPointProxyFactory; import org.apache.geronimo.messaging.proxy.EndPointProxyFactoryImpl; import org.apache.geronimo.messaging.proxy.EndPointProxyInfo; @@ -201,7 +203,7 @@ processed.add(nodeInfo); prepareNodes(aTopology, processed, nodeInfo); - endPointView.commitTopology(aTopology); + endPointView.commitTopology(); processed = new HashSet(); processed.add(nodeInfo); commitNodes(aTopology, processed, nodeInfo); @@ -219,8 +221,8 @@ * @param aProcessed Set nodes already processed. * @param aNodeInfo Node to be prepared. */ - private void prepareNodes(NodeTopology aTopology, Set aProcessed, - NodeInfo aNodeInfo) { + private void prepareNodes(final NodeTopology aTopology, Set aProcessed, + NodeInfo aNodeInfo) throws NodeException { // Computes the neighbours which have not yet received the // topology reconfiguration. Set toBeProcessed = new HashSet(aTopology.getNeighbours(aNodeInfo)); @@ -231,20 +233,42 @@ return; } + // Prepares the topology of all the neighbours. for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) { NodeInfo nodeInfo = (NodeInfo) iter.next(); EndPointProxyInfo proxyInfo = new EndPointProxyInfo( NodeEndPointView.NODE_ID, NodeEndPointView.class, nodeInfo); NodeEndPointView topologyEndPoint = (NodeEndPointView) endPointProxyFactory.factory(proxyInfo); + // sends the Msg in the prepared topology. + ((EndPointProxy) topologyEndPoint).setTransformer( + new MsgTransformer() { + private Msg msg; + public Msg pop() { + MsgHeader hd = msg.getHeader(); + hd.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION, + new Integer(aTopology.getVersion())); + return msg; + } + public void push(Msg aMsg) { + msg = aMsg; + } + }); try { topologyEndPoint.prepareTopology(aTopology); + } catch (CommunicationException e) { + throw new NodeException("Can not prepare topology", e); } finally { endPointProxyFactory.releaseProxy(topologyEndPoint); } // Computes the nodes which have already received the topology // reconfiguration. aProcessed.add(nodeInfo); + } + + // Prepares the topology of the neighbours of the direct neighbours. + for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) { + NodeInfo nodeInfo = (NodeInfo) iter.next(); prepareNodes(aTopology, aProcessed, nodeInfo); } } @@ -256,8 +280,8 @@ * @param aProcessed Set nodes already processed. * @param aNodeInfo Node to be committed. */ - private void commitNodes(NodeTopology aTopology, Set aProcessed, - NodeInfo aNodeInfo) { + private void commitNodes(final NodeTopology aTopology, Set aProcessed, + NodeInfo aNodeInfo) throws NodeException { Set toBeProcessed = new HashSet(aTopology.getNeighbours(aNodeInfo)); toBeProcessed.removeAll(aProcessed); @@ -271,12 +295,32 @@ NodeEndPointView.NODE_ID, NodeEndPointView.class, nodeInfo); NodeEndPointView topologyEndPoint = (NodeEndPointView) endPointProxyFactory.factory(proxyInfo); + // sends the Msg in the prepared topology. + ((EndPointProxy) topologyEndPoint).setTransformer( + new MsgTransformer() { + private Msg msg; + public Msg pop() { + MsgHeader hd = msg.getHeader(); + hd.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION, + new Integer(aTopology.getVersion())); + return msg; + } + public void push(Msg aMsg) { + msg = aMsg; + } + }); try { - topologyEndPoint.commitTopology(aTopology); + topologyEndPoint.commitTopology(); + } catch (CommunicationException e) { + throw new NodeException("Can not commit topology", e); } finally { endPointProxyFactory.releaseProxy(topologyEndPoint); } aProcessed.add(nodeInfo); + } + + for (Iterator iter = toBeProcessed.iterator(); iter.hasNext();) { + NodeInfo nodeInfo = (NodeInfo) iter.next(); commitNodes(aTopology, aProcessed, nodeInfo); } } @@ -316,7 +360,7 @@ return Collections.EMPTY_SET; } public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) { - throw new UnsupportedOperationException("getPath"); + return null; } public int getIDOfNode(NodeInfo aNodeInfo) { throw new UnsupportedOperationException("getIDOfNode"); @@ -332,8 +376,8 @@ public int getVersion() { return 0; } - public void setVersion(int aVersion) { - throw new UnsupportedOperationException("setVersion"); + public String toString() { + return "Stand-alone node " + nodeInfo; } }; setTopology(topology); @@ -465,10 +509,6 @@ public void push(Msg aMsg, Throwable aThrowable) { log.error("Can not deliver " + aMsg, aThrowable); - // Send the Msg back to the caller and provide the exception. - Msg msg = aMsg.reply(); - msg.getBody().setContent(new Result(false, aThrowable)); - nodeManager.getMsgConsumerOut().push(aMsg); } } @@ -498,22 +538,24 @@ private class NodeEndPointViewImpl extends BaseEndPoint implements NodeEndPointView { + private NodeTopology topology; private NodeEndPointViewImpl() { super(NodeImpl.this, NODE_ID); } - public void prepareTopology(NodeTopology aTopology) { + public void prepareTopology(NodeTopology aTopology) + throws NodeException { // Registers a future topology here. This way neighbours can start // to send Msgs compressed with the new topology. - // TODO re-enable compression. -// compression.registerFutureTopology(aTopology); - nodeManager.setTopology(aTopology); - } - public void commitTopology(NodeTopology aTopology) { - log.info("********** Topology update **********\n" + aTopology); - nodeTopology = aTopology; - log.info("********** End Topology update ******"); - // TODO re-enable compression. -// compression.registerTopology(aTopology); + compression.prepareTopology(aTopology); + nodeManager.prepareTopology(aTopology); + topology = aTopology; + } + public void commitTopology() { + nodeManager.commitTopology(); + compression.commitTopology(); + nodeTopology = topology; + log.info("\n********** Topology update **********\n" + + topology + "\n********** End Topology update ******"); } } 1.3 +5 -6 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeEndPointView.java Index: NodeEndPointView.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/NodeEndPointView.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- NodeEndPointView.java 17 Jul 2004 03:52:34 -0000 1.2 +++ NodeEndPointView.java 20 Jul 2004 00:26:04 -0000 1.3 @@ -43,16 +43,15 @@ * specified topology. * * @param aTopology Topology to be prepared. + * @exception NodeException If the node can not join all of its neighbours. */ - public void prepareTopology(NodeTopology aTopology); + public void prepareTopology(NodeTopology aTopology) throws NodeException; /** - * Commits the specified topology. - *
- * This latter must have been prepared just before this call. + * Commits the topology previously prepared. * * @param aTopology Topology to be committed. */ - public void commitTopology(NodeTopology aTopology); + public void commitTopology(); } 1.4 +36 -27 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java Index: LogicalCompression.java =================================================================== RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/LogicalCompression.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- LogicalCompression.java 5 Jul 2004 07:03:50 -0000 1.3 +++ LogicalCompression.java 20 Jul 2004 00:26:04 -0000 1.4 @@ -46,14 +46,14 @@ { /** - * Future topology. + * Topology being prepared. */ - private NodeTopology futTopology; + private volatile NodeTopology preparedTopology; /** - * Current registered topology. + * Committed topology. */ - private NodeTopology curTopology; + private volatile NodeTopology topology; /** * No logical compression. @@ -77,23 +77,24 @@ /** * Registers a future topology. It is only used to uncompress Msgs which - * have not been compressed by the current topology. + * have not been compressed by the topology currently committed. * - * @param aTopology Future topology. + * @param aTopology Topology. */ - public void registerFutureTopology(NodeTopology aTopology) { - futTopology = aTopology; + public void prepareTopology(NodeTopology aTopology) { + preparedTopology = aTopology; } /** - * Registers the current topology. It is used to compress and uncompress - * Msgs. If it is not possible to uncompress a Msg with the current - * topology, then the future topology is used. + * Commits the previousy prepared topology. It is used to compress and + * uncompress Msgs. If it is not possible to uncompress a Msg with the + * current topology, then the future topology is used. * * @param aTopology Current topology. */ - public void registerTopology(NodeTopology aTopology) { - curTopology = aTopology; + public void commitTopology() { + topology = preparedTopology; + preparedTopology = null; } public Object beforePop(StreamInputStream anIn) @@ -112,17 +113,8 @@ return result; } int version = anIn.readInt(); - NodeTopology topology = null; - if ( null != curTopology && version == curTopology.getVersion() ) { - topology = curTopology; - } else if ( null != futTopology && - version == futTopology.getVersion() ) { - topology = futTopology; - } - if ( null == topology ) { - throw new IllegalArgumentException("No topology with version {" + - version + "} is defined."); - } + NodeTopology topology = getTopology(version); + int id = anIn.readInt(); NodeInfo nodeInfo = topology.getNodeById(id); result.add(nodeInfo); @@ -164,8 +156,11 @@ RequestSender.RequestID reqID = (RequestSender.RequestID) header.resetHeader(MsgHeaderConstants.CORRELATION_ID); anOut.writeByte(reqID.getID()); - NodeTopology topology = curTopology; - if ( null == topology ) { + Integer version = (Integer) + header.getHeader(MsgHeaderConstants.TOPOLOGY_VERSION); + NodeTopology topology = getTopology(version.intValue()); + // Uses only the current topology to compress the data. + if ( null == topology || preparedTopology == topology ) { anOut.writeByte(NULL); return null; } @@ -190,4 +185,18 @@ Object anOpaque) throws IOException { } + private NodeTopology getTopology(int aVersion) { + if ( 0 == aVersion || null == topology ) { + return null; + } else if ( aVersion == topology.getVersion() ) { + return topology; + } else if ( null == preparedTopology || + aVersion == preparedTopology.getVersion() ) { + return preparedTopology; + } else { + throw new IllegalArgumentException("Topology version " + + aVersion + " is too old."); + } + } + }