geronimo-scm mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdam...@apache.org
Subject cvs commit: incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication ReplicationTest.java
Date Wed, 24 Mar 2004 11:37:07 GMT
gdamour     2004/03/24 03:37:07

  Modified:    sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        StreamManagerImpl.java RequestSender.java
                        Connector.java MetaConnection.java
               sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication
                        ReplicationMember.java
               sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
                        DummyConnector.java CommandRequestTest.java
               sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication
                        ReplicationTest.java
  Added:       sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        AbstractConnector.java Node.java NodeContext.java
                        NodeImpl.java NodeProcessors.java
               sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication
                        ReplicationMemberImpl.java
               sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
                        NodeTest.java
  Removed:     sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        ServerNode.java ServerNodeContext.java
                        ServerProcessors.java
               sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
                        ServerNodeTest.java
  Log:
  Round of refactoring in order to be compliant with the GBean framework:
  
  o Interfaces have been defined for References;
  o It is now up to Connectors to add/remove themselves to the Node
  enabling Msgs passing;
  o A base implementation for the Connector contract is provided;
  o JUnit tests have been updated in order to also test the GBean
  configuration;
  o ServerNode, ServerNodeContext and ServerProcessors are been
  renamed Node, NodeContext and NodeProcessors respectively.
  
  Revision  Changes    Path
  1.4       +6 -50     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamManagerImpl.java
  
  Index: StreamManagerImpl.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamManagerImpl.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- StreamManagerImpl.java	11 Mar 2004 15:36:14 -0000	1.3
  +++ StreamManagerImpl.java	24 Mar 2004 11:37:05 -0000	1.4
  @@ -34,6 +34,7 @@
    * @version $Revision$ $Date$
    */
   public class StreamManagerImpl
  +    extends AbstractConnector
       implements Connector, StreamManager
   {
   
  @@ -70,26 +71,13 @@
       private final Map inputStreams;
   
       /**
  -     * Context of the ServerNode which has mounted this instance.
  -     */
  -    protected ServerNodeContext serverNodeContext;
  -    
  -    /**
  -     * To send requests.
  -     */
  -    protected RequestSender sender;
  -    
  -    /**
  -     * Used to communicate with remote StreamManagers.
  -     */
  -    protected MsgOutInterceptor out;
  -    
  -    /**
        * Creates a manager owned by the specified node.
        * 
  +     * @param aServerNode ServerNode containing this instance.
        * @param aNode Node owning this manager.
        */
  -    public StreamManagerImpl(NodeInfo aNode) {
  +    public StreamManagerImpl(NodeImpl aServerNode, NodeInfo aNode) {
  +        super(aServerNode);
           if ( null == aNode ) {
               throw new IllegalArgumentException("Node is required.");
           }
  @@ -101,12 +89,6 @@
           return owningNode.getName();
       }
       
  -    public void setContext(ServerNodeContext aContext) {
  -        serverNodeContext = aContext;
  -        sender = aContext.getRequestSender();
  -        out = aContext.getOutput();
  -    }
  -    
       public Object register(InputStream anIn) {
           if ( null == anIn ) {
               return NULL_INPUT_STREAM;
  @@ -175,17 +157,6 @@
           
       }
       
  -    public void deliver(Msg aMsg) {
  -        MsgHeader header = aMsg.getHeader();
  -        MsgBody.Type bodyType =
  -            (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
  -        if ( bodyType.equals(MsgBody.Type.REQUEST) ) {
  -            handleRequest(aMsg);
  -        } else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) {
  -            handleResponse(aMsg);
  -        }
  -    }
  -    
       /**
        * Handles a request Msg.
        * 
  @@ -221,21 +192,6 @@
           reqOut.push(msg);
       }
   
  -    /**
  -     * Handles a response Msg.
  -     * 
  -     * @param aMsg Response to be handled.
  -     */
  -    protected void handleResponse(Msg aMsg) {
  -        MsgBody body = aMsg.getBody();
  -        MsgHeader header = aMsg.getHeader();
  -        CommandResult result;
  -        result = (CommandResult) body.getContent();
  -        sender.setResponse(
  -            header.getHeader(MsgHeaderConstants.CORRELATION_ID),
  -            result);
  -    }
  -    
       /**
        * InputStream returned when a GInputStream is deserialized. This 
        * InputStream calls back its StreamManager when its internal buffer is
  
  
  
  1.6       +2 -1      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java
  
  Index: RequestSender.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- RequestSender.java	18 Mar 2004 12:14:05 -0000	1.5
  +++ RequestSender.java	24 Mar 2004 11:37:05 -0000	1.6
  @@ -107,6 +107,7 @@
           RequestID id = createID(aTargetNodes);
           header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
           header.addHeader(MsgHeaderConstants.DEST_NODES, aTargetNodes);
  +        header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST);
           
           MsgBody body = msg.getBody();
           body.setContent(anOpaque);
  
  
  
  1.3       +2 -2      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Connector.java
  
  Index: Connector.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Connector.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Connector.java	11 Mar 2004 15:36:14 -0000	1.2
  +++ Connector.java	24 Mar 2004 11:37:05 -0000	1.3
  @@ -41,7 +41,7 @@
       /**
        * Sets the ServerNode context of this Connector.
        */
  -    public void setContext(ServerNodeContext aContext);
  +    public void setContext(NodeContext aContext);
   
       /**
        * When a ServerNode receives a Msg to be delivered to a Connector, it
  
  
  
  1.6       +4 -4      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java
  
  Index: MetaConnection.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- MetaConnection.java	18 Mar 2004 12:14:05 -0000	1.5
  +++ MetaConnection.java	24 Mar 2004 11:37:05 -0000	1.6
  @@ -43,7 +43,7 @@
       /**
        * Node owning this connection.
        */
  -    private final ServerNode node;
  +    private final NodeImpl node;
       
       /**
        * NodeInfo to Connection map.
  @@ -72,7 +72,7 @@
        * 
        * @param aNode Node.
        */
  -    public MetaConnection(ServerNode aNode) {
  +    public MetaConnection(NodeImpl aNode) {
           if ( null == aNode ) {
               throw new IllegalArgumentException("Node is required.");
           }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/AbstractConnector.java
  
  Index: AbstractConnector.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import java.util.Arrays;
  
  import org.apache.geronimo.gbean.GAttributeInfo;
  import org.apache.geronimo.gbean.GBean;
  import org.apache.geronimo.gbean.GBeanContext;
  import org.apache.geronimo.gbean.GBeanInfo;
  import org.apache.geronimo.gbean.GBeanInfoFactory;
  import org.apache.geronimo.gbean.GConstructorInfo;
  import org.apache.geronimo.gbean.WaitingException;
  
  /**
   * Based implementation for the Connector contracts.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
   */
  public abstract class AbstractConnector implements Connector, GBean {
  
      /**
       * Node owning this Connector. 
       */
      protected Node node;
      
      /**
       * Context of the ServerNode which has mounted this instance.
       */
      protected NodeContext serverNodeContext;
      
      /**
       * To send requests.
       */
      protected RequestSender sender;
  
      /**
       * Used to communicate with remote Connectors.
       */
      protected MsgOutInterceptor out;
      
      /**
       * Creates a Connector, which is mounted by the specified node.
       * 
       * @param aNode Node owning this connector.
       */
      public AbstractConnector(Node aNode) {
          if ( null == aNode ) {
              throw new IllegalArgumentException("Node is required.");
          }
          node = aNode;
      }
      
      public void setContext(NodeContext aContext) {
          serverNodeContext = aContext;
          sender = aContext.getRequestSender();
          out = aContext.getOutput();
      }
  
      public void deliver(Msg aMsg) {
          MsgHeader header = aMsg.getHeader();
          MsgBody.Type bodyType =
          (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
          if ( MsgBody.Type.REQUEST == bodyType ) {
              handleRequest(aMsg);
          } else if ( MsgBody.Type.RESPONSE == bodyType ) {
              handleResponse(aMsg);
          }
      }
  
      /**
       * Handles a request Msg.
       * 
       * @param aMsg Request Msg to be handled.
       */
      protected abstract void handleRequest(Msg aMsg);
      
      /**
       * Handles a response Msg.
       * 
       * @param aMsg Response to be handled.
       */
      protected void handleResponse(Msg aMsg) {
          MsgBody body = aMsg.getBody();
          MsgHeader header = aMsg.getHeader();
          CommandResult result;
          result = (CommandResult) body.getContent();
          sender.setResponse(
              header.getHeader(MsgHeaderConstants.CORRELATION_ID),
              result);
      }
  
      public void setGBeanContext(GBeanContext context) {
      }
  
      public void doStart() throws WaitingException, Exception {
          node.addConnector(this);
      }
  
      public void doStop() throws WaitingException, Exception {
          node.removeConnector(this);
      }
  
      public void doFail() {
          node.removeConnector(this);
      }
      
      
      public static final GBeanInfo GBEAN_INFO;
  
      static {
          GBeanInfoFactory infoFactory = new GBeanInfoFactory("Abstract Connector", AbstractConnector.class.getName());
          infoFactory.addAttribute(new GAttributeInfo("Name", true));
          infoFactory.addAttribute(new GAttributeInfo("Context", false));
          infoFactory.addReference("Node", Node.class);
          infoFactory.addOperation("deliver", new Class[] {Msg.class});
          infoFactory.setConstructor(new GConstructorInfo(
              Arrays.asList(new Object[]{"Node"}),
              Arrays.asList(new Object[]{Node.class})));
          GBEAN_INFO = infoFactory.getBeanInfo();
      }
  
      public static GBeanInfo getGBeanInfo() {
          return GBEAN_INFO;
      }
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Node.java
  
  Index: Node.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import java.io.IOException;
  
  /**
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
   */
  public interface Node {
      
      /**
       * Gets the NodeInfo of this node.
       * 
       * @return NodeInfo.
       */
      public NodeInfo getNodeInfo();
      
      /**
       * Sets the node topology in which this instance is operating. 
       * 
       * @param aTopology Topology of the nodes constituting the network layout.
       */
      public void setTopology(Topology aTopology);
      
      /**
       * Joins the node uniquely identified on the network by aNodeInfo.
       * 
       * @param aNodeInfo NodeInfo of a remote node to join.
       * @throws IOException Indicates that an I/O error has occured.
       * @throws CommunicationException Indicates that the node can not be
       * registered by the remote node identified by aNodeInfo.
       */
      public void join(NodeInfo aNodeInfo)
          throws IOException, CommunicationException;
      
      /**
       * Leaves the node uniquely identified on the network by aNodeInfo.
       * 
       * @param aNodeInfo NodeInfo of the remote node to leave.
       * @throws IOException Indicates that an I/O error has occured.
       * @throws CommunicationException Indicates that the node has not leaved
       * successfully the remote node.
       */
      public void leave(NodeInfo aNodeInfo)
          throws IOException, CommunicationException;
      
      /**
       * Gets the StreamManager of this node.
       * 
       * @return StreamManager used by this server to resolve/encode InputStreams.
       */
      public StreamManager getStreamManager();
      
      /**
       * Registers a new Connector.
       * 
       * @param aConnector Connector to be registered.
       */
      public void addConnector(Connector aConnector);
      
      /**
       * Unregisters the Connector.
       * 
       * @param aConnector Connector to be deregistered.
       */
      public void removeConnector(Connector aConnector);
      
  }
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeContext.java
  
  Index: NodeContext.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  /**
   * Context provided by a Node to its Connectors when they are registered.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
   */
  public class NodeContext {
  
      private final MsgOutInterceptor out;
      private final RequestSender sender;
      
      public NodeContext(MsgOutInterceptor anOut, RequestSender aSender) {
          out = anOut;
          sender = aSender;
      }
      
      /**
       * Gets the Msg outbound interceptor to be used to contact remote
       * Connectors.
       */
      public MsgOutInterceptor getOutput() {
          return out;
      }
  
      /**
       * Gets the RequestSender to be used to send requests to remote Connectors.
       */
      public RequestSender getRequestSender() {
          return sender;
      }
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeImpl.java
  
  Index: NodeImpl.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.util.HashMap;
  import java.util.Map;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.geronimo.gbean.GBean;
  import org.apache.geronimo.gbean.GBeanContext;
  import org.apache.geronimo.gbean.GBeanInfo;
  import org.apache.geronimo.gbean.GBeanInfoFactory;
  import org.apache.geronimo.gbean.WaitingException;
  import org.mortbay.util.ThreadedServer;
  
  /**
   * It allows a remote connectivity to a set of Connectors.
   * <BR>
   * It is the only component dealing directly with raw connections: it directly
   * accesses the InputStream and OutputStream of the registered connections. It
   * insulates the other components from connectivity issues.
   * <BR>
   * It is also in charge of dispatching the incoming Msgs to the registered
   * Connectors.
   * <BR>
   * The following diagram shows how ServantNode and Connectors are combined
   * together:
   * 
   * Connector -- MTO -- Node -- MTM -- Node -- OTM -- Connector
   *
   * Connector communicates with each other by sending Msgs.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
   */
  public class NodeImpl
      implements Node, GBean
  {
  
      private static final Log log = LogFactory.getLog(NodeImpl.class);
  
      private static final NodeContext NULL_CONTEXT = new NodeContext(null, null);
      
      /**
       * Node meta-data.
       */
      private final NodeInfo nodeInfo;
      
      /**
       * Connectors registered by this server.
       */
      private final Map connectors;
      
      /**
       * StreamManager to register/retrieve distributed InputStream.
       */
      private final StreamManager streamManager;
      
      /**
       * Server listening for connections to be made.
       */
      private final InternalServer server;
      
      /**
       * Inbound Msg queue. This queue is filled by Msgs coming directly
       * from the network connections.
       */
      final MsgQueue queueIn;
      
      /**
       * Inbound Msgs reactor. it is between the inbound Msg queue and 
       * the Connectors.
       */
      final HeaderReactor inReactor;
      
      /**
       * Outbound Msgs queue. This queue is a staging repository for Nsgs
       * to be sent over the network.
       */
      final MsgQueue queueOut;
      
      /**
       * Connections to this server. The key is the name of the ServantNode and 
       * the value is a ConnectionWrapper.
       */
      final Map connections;
      
      /**
       * Processors of this server.
       */
      final NodeProcessors processors;
      
      /**
       * MetaConnection to other nodes.
       */
      final MetaConnection metaConnection;
  
      private final RequestSender sender;
      
      private GBeanContext context;
  
      /**
       * Creates a server.
       * 
       * @param aNodeInfo NodeInfo identifying uniquely this node on the network.
       */
      public NodeImpl(NodeInfo aNodeInfo) {
          if ( null == aNodeInfo ) {
              throw new IllegalArgumentException("NodeInfo is required.");
          }
          nodeInfo = aNodeInfo;
          sender = new RequestSender(nodeInfo);
          server = new InternalServer();
          
          metaConnection = new MetaConnection(this);
          
          streamManager = new StreamManagerImpl(this, nodeInfo);
          
          processors = new NodeProcessors(this);
          
          queueIn = new MsgQueue(nodeInfo.getName() + " Inbound");
          queueOut = new MsgQueue(nodeInfo.getName() + " Outbound");
          
          connections = new HashMap();
          
          // The incoming messages are dispatched to the clients.
          inReactor = new HeaderReactor(
              new HeaderInInterceptor(
                  new QueueInInterceptor(queueIn),
                  MsgHeaderConstants.DEST_CONNECTOR),
                  processors.getProcessors());
          
          inReactor.register(StreamManager.NAME, streamManager);
          // The stream manager writes to the output queue of the server.
          NodeContext nodeContext = new NodeContext(
              new HeaderOutInterceptor(
                  MsgHeaderConstants.SRC_CONNECTOR,
                  StreamManager.NAME,
                  new QueueOutInterceptor(queueOut)),
              sender
              );
          streamManager.setContext(nodeContext);
                  
          connectors = new HashMap();
      }
  
      /**
       * Gets the NodeInfo of this node.
       * 
       * @return NodeInfo.
       */
      public NodeInfo getNodeInfo() {
          return nodeInfo;
      }
      
      /**
       * Sets the node topology in which this instance is operating. 
       * 
       * @param aTopology Topology of the nodes constituting the network layout.
       */
      public void setTopology(Topology aTopology) {
          metaConnection.setTopology(aTopology);
      }
      
      /**
       * Joins the node uniquely identified on the network by aNodeInfo.
       * 
       * @param aNodeInfo NodeInfo of a remote node to join.
       * @throws IOException Indicates that an I/O error has occured.
       * @throws CommunicationException Indicates that the node can not be
       * registered by the remote node identified by aNodeInfo.
       */
      public void join(NodeInfo aNodeInfo)
          throws IOException, CommunicationException {
          metaConnection.join(aNodeInfo);
      }
      
      /**
       * Leaves the node uniquely identified on the network by aNodeInfo.
       * 
       * @param aNodeInfo NodeInfo of the remote node to leave.
       * @throws IOException Indicates that an I/O error has occured.
       * @throws CommunicationException Indicates that the node has not leaved
       * successfully the remote node.
       */
      public void leave(NodeInfo aNodeInfo)
          throws IOException, CommunicationException {
          metaConnection.leave(aNodeInfo);
      }
      
      /**
       * Gets the StreamManager of this server.
       * 
       * @return StreamManager used by this server to resolve/encode InputStreams.
       */
      public StreamManager getStreamManager() {
          return streamManager;
      }
  
      /**
       * Gets the Output to be used to communicate with the specified node.
       * <BR>
       * aNode must be a node directly connected to this instance.
       * 
       * @param aNode Node.
       * @return Output to be used to communicate with the specified node.
       * @throws CommunicationException
       */
      public MsgOutInterceptor getRawOutForNode(NodeInfo aNode)
          throws CommunicationException {
          return metaConnection.getRawOutForNode(aNode);
      }
          
      /**
       * Gets the Output to be used to communicate with the specified node.
       * <BR>
       * aNode can be a node anywhere in the topology.
       * 
       * @param aNode Node.
       * @return Output to be used to communicate with the specified node.
       */
      protected MsgOutInterceptor getOutForNode(NodeInfo aNode)
          throws CommunicationException {
          return metaConnection.getOutForNode(aNode);
      }
      
      /**
       * Registers a new Connector.
       * 
       * @param aConnector Connector to be registered.
       */
      public void addConnector(Connector aConnector) {
          String pName = aConnector.getName();
          // Connectors write to the outbound Msg queue.
          NodeContext nodeContext = new NodeContext(
              new HeaderOutInterceptor(
                  MsgHeaderConstants.SRC_CONNECTOR,
                  pName,
                  new QueueOutInterceptor(queueOut)),
                  sender);
          aConnector.setContext(nodeContext);
          inReactor.register(pName, aConnector);
          synchronized (connectors) {
              connectors.put(pName, aConnector);
          }
      }
      
      /**
       * Unregisters the Connector.
       * 
       * @param aConnector Connector to be deregistered.
       */
      public void removeConnector(Connector aConnector) {
          String pName = aConnector.getName();
          aConnector.setContext(NULL_CONTEXT);
          inReactor.unregister(pName);
          synchronized (connectors) {
              connectors.remove(pName);
          }
      }
      
      public void setGBeanContext(GBeanContext aContext) {
          context = aContext;
      }
  
      public void doStart() throws WaitingException, Exception {
          server.start();
          processors.start();
      }
  
      public void doStop() throws WaitingException, Exception {
          server.stop();
          metaConnection.stop();
          processors.stop();
      }
  
      public void doFail() {
          server.stop();
          metaConnection.stop();
          processors.stop();
      }
      
      public String toString() {
          return "Node {" + nodeInfo + "}";
      }
  
      /**
       * Socket server listening for connections to be made to this node.
       */
      private class InternalServer extends ThreadedServer {
          
          public InternalServer() {
              super(nodeInfo.getAddress(), nodeInfo.getPort());
              // No socket timeout.
              setMaxIdleTimeMs(0);
          }
          
          /**
           * Handles a new connection.
           */
          protected void handleConnection(InputStream anIn,OutputStream anOut) {
              try {
                  metaConnection.joined(anIn, anOut);
              } catch (IOException e) {
                  log.error(e);
              } catch (CommunicationException e) {
                  log.error(e);
              }
          }
          
          public void start() {
              try {
                  super.start();
              } catch (Exception e) {
                  log.error(e);
                  context.fail();
              }
          }
          
          public void stop() {
              try {
                  super.stop();
              } catch (InterruptedException e) {
                  log.error(e);
                  context.fail();
              }
          }
          
          public void fail() {
              try {
                  super.stop();
              } catch (InterruptedException e) {
                  log.error(e);
              }
          }
      }
      
      public static final GBeanInfo GBEAN_INFO;
  
      static {
          GBeanInfoFactory factory = new GBeanInfoFactory(NodeImpl.class);
          factory.setConstructor(
              new String[] {"NodeInfo"},
              new Class[] {NodeInfo.class});
          factory.addAttribute("NodeInfo", true);
          factory.addAttribute("Topology", true);
          factory.addAttribute("StreamManager", false);
          factory.addOperation("join", new Class[]{NodeInfo.class});
          factory.addOperation("leave", new Class[]{NodeInfo.class});
          factory.addOperation("addConnector", new Class[]{Connector.class});
          factory.addOperation("removeConnector", new Class[]{Connector.class});
          GBEAN_INFO = factory.getBeanInfo();
      }
  
      public static GBeanInfo getGBeanInfo() {
          return GBEAN_INFO;
      }
      
  }
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeProcessors.java
  
  Index: NodeProcessors.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  /**
   * Processors associated to a server.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:05 $
   */
  class NodeProcessors
  {
  
      private static final Log log = LogFactory.getLog(NodeProcessors.class);
  
      /**
       * Server owning these processors.
       */
      private final NodeImpl server;
  
      /**
       * StreamManager used by the server to resolve InputStreams.
       */
      private final StreamManager streamManager;
      
      /**
       * Processor pool.
       */
      private final Processors processors;
      
      /**
       * Creates processors for the provided server.
       * 
       * @param aServer Server owning these processors. 
       */
      public NodeProcessors(NodeImpl aServer) {
          server = aServer;
          processors = new Processors(aServer.getNodeInfo().getName(), 2, 10);
          streamManager = aServer.getStreamManager();
      }
      
      /**
       * Execute a Processor in a separate Thread.
       *  
       * @param aProcessor Processor to be executed.
       */
      public void execute(Processor aProcessor) {
          processors.execute(aProcessor);
      }
      
      public Processors getProcessors() {
          return processors;
      }
      
      /**
       * Dispatches the Msgs seating in the inbound queue. Pushes the Msg seating
       * in the outbound queue to the relevant node.
       */
      public void start() {
          processors.execute(server.inReactor);
          processors.execute(new OutputQueueDispatcher());
      }
      
      public void stop() {
          processors.stop();
      }
      
      /**
       * Runnable in charge of dispatching the Msgs seating in the outbound
       * queue to the relevant node. 
       */
      private class OutputQueueDispatcher implements Processor {
          
          public void run() {
              HeaderInInterceptor in =
                  new HeaderInInterceptor(
                      new QueueInInterceptor(server.queueOut),
                      MsgHeaderConstants.DEST_NODES);
              while ( true ) {
                  Msg msg;
                  try {
                      msg = in.pop();
                  } catch (MsgInterceptorStoppedException e) {
                      log.info("Stopping OutputQueueDispatcher", e);
                      return;
                  }
                  Object destNode = in.getHeader();
                  MsgOutInterceptor out;
                  if ( destNode instanceof NodeInfo ) {
                      destNode = new NodeInfo[] {(NodeInfo) destNode};
                  }
                  NodeInfo[] dests = (NodeInfo[]) destNode;
                  for (int i = 0; i < dests.length; i++) {
                      NodeInfo target = dests[i];
                      Msg msg2 = new Msg(msg);
                      MsgHeader header = msg2.getHeader();
                      // A path is defined if this Msg is routed via the node 
                      // owning this instance.
                      NodeInfo[] path = (NodeInfo[])
                          header.getOptionalHeader(MsgHeaderConstants.DEST_NODE_PATH);
                      try {
                          if ( null != path ) {
                              target = path[0];
                              header.addHeader(MsgHeaderConstants.DEST_NODE_PATH,
                                  NodeInfo.pop(path));
                              out = server.getRawOutForNode(target);
                          } else {
                              out = server.getOutForNode(target);
                          }
                      } catch (CommunicationException e) {
                          log.error(e);
                          continue;
                      }
                      out.push(msg2);
                  }
              }
          }
  
      }
      
  }
  
  
  
  1.3       +8 -261    incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMember.java
  
  Index: ReplicationMember.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMember.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ReplicationMember.java	11 Mar 2004 15:36:14 -0000	1.2
  +++ ReplicationMember.java	24 Mar 2004 11:37:06 -0000	1.3
  @@ -17,115 +17,16 @@
   
   package org.apache.geronimo.datastore.impl.remote.replication;
   
  -import java.io.Externalizable;
  -import java.io.IOException;
  -import java.io.ObjectInput;
  -import java.io.ObjectOutput;
  -import java.util.HashMap;
  -import java.util.Map;
  -
  -import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest;
  -import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult;
   import org.apache.geronimo.datastore.impl.remote.messaging.Connector;
  -import org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor;
  -import org.apache.geronimo.datastore.impl.remote.messaging.Msg;
  -import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody;
  -import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader;
  -import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants;
  -import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor;
  -import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
  -import org.apache.geronimo.datastore.impl.remote.messaging.RequestSender;
  -import org.apache.geronimo.datastore.impl.remote.messaging.ServerNodeContext;
  -import org.apache.geronimo.gbean.GBean;
  -import org.apache.geronimo.gbean.GBeanContext;
  -import org.apache.geronimo.gbean.WaitingException;
   
   /**
  - * A replication group member.
  - * <BR>
  - * This is a Connector in charge of replicating the state of registered
  - * ReplicantCapables across N-nodes, which constitute a replication group.
  - * <BR>
  - * Replication members are organized as follow:
  - * <pre>
  - * ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM -- ReplicationMember
  - * </pre>
    *
    * @version $Revision$ $Date$
    */
  -public class ReplicationMember
  -    implements UpdateListener, Connector, GBean
  -{
  -
  -    /**
  -     * Name of the replication group.
  -     */
  -    private final String name;
  -    
  -    /**
  -     * ReplicantID to ReplicantCapable Map.
  -     */
  -    private final Map idToReplicant;
  -    
  -    /**
  -     * Nodes hosting the other members of the replication group
  -     * of this member.
  -     */
  -    private NodeInfo[] targetNodes;
  +public interface ReplicationMember
  +    extends UpdateListener, Connector {
       
       /**
  -     * Context of the ServerNode which has mounted this instance.
  -     */
  -    protected ServerNodeContext serverNodeContext;
  -    
  -    /**
  -     * Output to be used to send requests.
  -     */
  -    private MsgOutInterceptor requestOut;
  -    
  -    /**
  -     * Output to be used to send results.
  -     */
  -    private MsgOutInterceptor resultOut;
  -    
  -    /**
  -     * Requests sender.
  -     */
  -    private RequestSender sender;
  -    
  -    /**
  -     * Creates a replication group member.
  -     * 
  -     * @param aName Name of the replication group owning this member.
  -     * @param aTargetNodes Nodes hosting the other members of the
  -     * replication group containing this member.
  -     */
  -    public ReplicationMember(String aName, NodeInfo[] aTargetNodes) {
  -        if ( null == aName ) {
  -            throw new IllegalArgumentException("Name is required");
  -        } else if ( null == aTargetNodes ) {
  -            throw new IllegalArgumentException("Node names is required");
  -        }
  -        name = aName;
  -        targetNodes = aTargetNodes;
  -        idToReplicant = new HashMap();
  -    }
  -    
  -    public String getName() {
  -        return name;
  -    }
  -
  -    public void fireUpdateEvent(UpdateEvent anEvent) {
  -        // One does not send the actual ReplicantCapable in the case of an
  -        // update. Instead, one sends only its identifier.
  -        ReplicationCapable target = (ReplicationCapable) anEvent.getTarget();
  -        anEvent.setTarget(target.getID());
  -        sender.sendSyncRequest(
  -            new CommandRequest("mergeWithUpdate", new Object[] {anEvent}),
  -            requestOut, targetNodes);
  -    }
  -
  -    /**
        * Merges an UpdateEvent with a registered ReplicationCapable.
        * 
        * @param anEvent Update event to be merged.
  @@ -133,18 +34,7 @@
        * performed.
        */
       public void mergeWithUpdate(UpdateEvent anEvent)
  -        throws ReplicationException {
  -        ReplicantID id = (ReplicantID) anEvent.getTarget();
  -        ReplicationCapable replicationCapable;
  -        synchronized(idToReplicant) {
  -            replicationCapable = (ReplicationCapable) idToReplicant.get(id);
  -        }
  -        if ( null == replicationCapable ) {
  -            throw new ReplicationException(
  -                "No ReplicantCapable with the id {" + id + "}");
  -        }
  -        replicationCapable.mergeWithUpdate(anEvent);
  -    }
  +        throws ReplicationException;
       
       /**
        * Registers a ReplicantCapable. From now, UpdateEvents multicasted
  @@ -153,18 +43,7 @@
        * 
        * @param aReplicant ReplicantCapable to be controlled by this group.
        */
  -    public void registerReplicantCapable(ReplicationCapable aReplicant) {
  -        ReplicantID id = new ReplicantID();
  -        aReplicant.setID(id);
  -        sender.sendSyncRequest(
  -            new CommandRequest("registerLocalReplicantCapable",
  -                new Object[] {aReplicant}),
  -            requestOut, targetNodes);
  -        synchronized(idToReplicant) {
  -            idToReplicant.put(id, aReplicant);
  -            aReplicant.addUpdateListener(this);
  -        }
  -    }
  +    public void registerReplicantCapable(ReplicationCapable aReplicant);
       
       /**
        * This method is for internal use only.
  @@ -174,12 +53,7 @@
        * 
        * @param aReplicant ReplicantCapable to be locally registered.
        */
  -    public void registerLocalReplicantCapable(ReplicationCapable aReplicant) {
  -        synchronized(idToReplicant) {
  -            aReplicant.addUpdateListener(this);
  -            idToReplicant.put(aReplicant.getID(), aReplicant);
  -        }
  -    }
  +    public void registerLocalReplicantCapable(ReplicationCapable aReplicant);
       
       /**
        * Retrieves the ReplicationCapable having the specified id.
  @@ -188,133 +62,6 @@
        * @return ReplicantCapable having the specified id or null if such an
        * identifier is not known.
        */
  -    public ReplicationCapable retrieveReplicantCapable(Object anID) {
  -        synchronized(idToReplicant) {
  -            return (ReplicationCapable) idToReplicant.get(anID);
  -        }
  -    }
  -    
  -    public void setContext(ServerNodeContext aContext) {
  -        serverNodeContext = aContext;
  -        sender = aContext.getRequestSender();
  -        MsgOutInterceptor out = aContext.getOutput();
  -        if ( null != out ) {
  -            out =
  -                new HeaderOutInterceptor(
  -                    MsgHeaderConstants.DEST_CONNECTOR,
  -                    name,
  -                    out);
  -            requestOut =
  -                new HeaderOutInterceptor(
  -                    MsgHeaderConstants.BODY_TYPE,
  -                    MsgBody.Type.REQUEST,
  -                    out);
  -            resultOut = 
  -                new HeaderOutInterceptor(
  -                    MsgHeaderConstants.BODY_TYPE,
  -                    MsgBody.Type.RESPONSE,
  -                    out);
  -        } else {
  -            requestOut = null;
  -            resultOut = null;
  -        }
  -    }
  -
  -    public void deliver(Msg aMsg) {
  -        MsgHeader header = aMsg.getHeader();
  -        MsgBody.Type bodyType =
  -        (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
  -        if ( bodyType.equals(MsgBody.Type.REQUEST) ) {
  -            handleRequest(aMsg);
  -        } else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) {
  -            handleResponse(aMsg);
  -        }
  -    }
  -    
  -    /**
  -     * Handles a request Msg.
  -     * 
  -     * @param aMsg Request Msg to be handled.
  -     */
  -    protected void handleRequest(Msg aMsg) {
  -        MsgBody body = aMsg.getBody();
  -        MsgHeader header = aMsg.getHeader();
  -        Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
  -        Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
  -        CommandRequest command;
  -        String gateway;
  -        command = (CommandRequest) body.getContent();
  -        command.setTarget(this);
  -        CommandResult result = command.execute();
  -        Msg msg = new Msg();
  -        body = msg.getBody();
  -        body.setContent(result);
  -        MsgOutInterceptor reqOut =
  -            new HeaderOutInterceptor(
  -                MsgHeaderConstants.CORRELATION_ID,
  -                id,
  -                new HeaderOutInterceptor(
  -                    MsgHeaderConstants.DEST_NODES,
  -                    targetNodes,
  -                    new HeaderOutInterceptor(
  -                        MsgHeaderConstants.DEST_CONNECTOR,
  -                        name,
  -                        resultOut)));
  -        reqOut.push(msg);
  -    }
  -
  -    /**
  -     * Handles a response Msg.
  -     * 
  -     * @param aMsg Response to be handled.
  -     */
  -    protected void handleResponse(Msg aMsg) {
  -        MsgBody body = aMsg.getBody();
  -        MsgHeader header = aMsg.getHeader();
  -        CommandResult result;
  -        result = (CommandResult) body.getContent();
  -        sender.setResponse(
  -            header.getHeader(MsgHeaderConstants.CORRELATION_ID),
  -            result);
  -    }
  +    public ReplicationCapable retrieveReplicantCapable(Object anID);
       
  -    public void setGBeanContext(GBeanContext context) {
  -    }
  -
  -    public void doStart() throws WaitingException, Exception {
  -    }
  -
  -    public void doStop() throws WaitingException, Exception {
  -    }
  -
  -    public void doFail() {
  -    }
  -
  -    /**
  -     * ReplicantCapable identifier. 
  -     */
  -    private static class ReplicantID implements Externalizable {
  -        private static volatile int seqId = 0;
  -        private int id;
  -        public ReplicantID() {
  -            id = seqId++;
  -        }
  -        public int hashCode() {
  -            return id;
  -        }
  -        public boolean equals(Object obj) {
  -            if ( false == obj instanceof ReplicantID ) {
  -               return false;
  -            }
  -            ReplicantID replicantID = (ReplicantID) obj;
  -            return id == replicantID.id;
  -        }
  -        public void writeExternal(ObjectOutput out) throws IOException {
  -            out.writeInt(id);
  -        }
  -        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
  -            id = in.readInt();
  -        }
  -    }
  -
  -}
  +}
  \ No newline at end of file
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMemberImpl.java
  
  Index: ReplicationMemberImpl.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  import java.io.Externalizable;
  import java.io.IOException;
  import java.io.ObjectInput;
  import java.io.ObjectOutput;
  import java.util.Arrays;
  import java.util.HashMap;
  import java.util.Map;
  
  import org.apache.geronimo.datastore.impl.remote.messaging.AbstractConnector;
  import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest;
  import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult;
  import org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor;
  import org.apache.geronimo.datastore.impl.remote.messaging.Msg;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor;
  import org.apache.geronimo.datastore.impl.remote.messaging.Node;
  import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
  import org.apache.geronimo.datastore.impl.remote.messaging.NodeContext;
  import org.apache.geronimo.gbean.GAttributeInfo;
  import org.apache.geronimo.gbean.GBean;
  import org.apache.geronimo.gbean.GBeanInfo;
  import org.apache.geronimo.gbean.GBeanInfoFactory;
  import org.apache.geronimo.gbean.GConstructorInfo;
  
  /**
   * A replication group member.
   * <BR>
   * This is a Connector in charge of replicating the state of registered
   * ReplicantCapables across N-nodes, which constitute a replication group.
   * <BR>
   * Replication members are organized as follow:
   * <pre>
   * ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM -- ReplicationMember
   * </pre>
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:06 $
   */
  public class ReplicationMemberImpl
      extends AbstractConnector
      implements ReplicationMember, GBean 
  {
  
      /**
       * Name of the replication group.
       */
      private final String name;
      
      /**
       * ReplicantID to ReplicantCapable Map.
       */
      private final Map idToReplicant;
      
      /**
       * Nodes hosting the other members of the replication group
       * of this member.
       */
      private NodeInfo[] targetNodes;
      
      /**
       * Output to be used to send requests.
       */
      private MsgOutInterceptor requestOut;
      
      /**
       * Output to be used to send results.
       */
      private MsgOutInterceptor resultOut;
      
      /**
       * Creates a replication group member.
       * 
       * @param aNode Node containing this instance.
       * @param aName Name of the replication group owning this member.
       * @param aTargetNodes Nodes hosting the other members of the
       * replication group containing this member.
       */
      public ReplicationMemberImpl(Node aNode,
          String aName, NodeInfo[] aTargetNodes) {
          super(aNode);
          if ( null == aName ) {
              throw new IllegalArgumentException("Name is required");
          } else if ( null == aTargetNodes ) {
              throw new IllegalArgumentException("Node names is required");
          }
          name = aName;
          targetNodes = aTargetNodes;
          idToReplicant = new HashMap();
      }
      
      public String getName() {
          return name;
      }
  
      public void fireUpdateEvent(UpdateEvent anEvent) {
          // One does not send the actual ReplicantCapable in the case of an
          // update. Instead, one sends only its identifier.
          ReplicationCapable target = (ReplicationCapable) anEvent.getTarget();
          anEvent.setTarget(target.getID());
          sender.sendSyncRequest(
              new CommandRequest("mergeWithUpdate", new Object[] {anEvent}),
              requestOut, targetNodes);
      }
  
      /**
       * Merges an UpdateEvent with a registered ReplicationCapable.
       * 
       * @param anEvent Update event to be merged.
       * @throws ReplicationException Indicates that the merge can not be
       * performed.
       */
      public void mergeWithUpdate(UpdateEvent anEvent)
          throws ReplicationException {
          ReplicantID id = (ReplicantID) anEvent.getTarget();
          ReplicationCapable replicationCapable;
          synchronized(idToReplicant) {
              replicationCapable = (ReplicationCapable) idToReplicant.get(id);
          }
          if ( null == replicationCapable ) {
              throw new ReplicationException(
                  "No ReplicantCapable with the id {" + id + "}");
          }
          replicationCapable.mergeWithUpdate(anEvent);
      }
      
      /**
       * Registers a ReplicantCapable. From now, UpdateEvents multicasted
       * by the provided ReplicantCapable are also pushed to the replication
       * group.
       * 
       * @param aReplicant ReplicantCapable to be controlled by this group.
       */
      public void registerReplicantCapable(ReplicationCapable aReplicant) {
          ReplicantID id = new ReplicantID();
          aReplicant.setID(id);
          sender.sendSyncRequest(
              new CommandRequest("registerLocalReplicantCapable",
                  new Object[] {aReplicant}),
              requestOut, targetNodes);
          synchronized(idToReplicant) {
              idToReplicant.put(id, aReplicant);
              aReplicant.addUpdateListener(this);
          }
      }
      
      /**
       * This method is for internal use only.
       * <BR>
       * It registers with this member a ReplicationCapable, which has been
       * registered by a remote member.  
       * 
       * @param aReplicant ReplicantCapable to be locally registered.
       */
      public void registerLocalReplicantCapable(ReplicationCapable aReplicant) {
          synchronized(idToReplicant) {
              aReplicant.addUpdateListener(this);
              idToReplicant.put(aReplicant.getID(), aReplicant);
          }
      }
      
      /**
       * Retrieves the ReplicationCapable having the specified id.
       * 
       * @param anID Replicant identifier.
       * @return ReplicantCapable having the specified id or null if such an
       * identifier is not known.
       */
      public ReplicationCapable retrieveReplicantCapable(Object anID) {
          synchronized(idToReplicant) {
              return (ReplicationCapable) idToReplicant.get(anID);
          }
      }
      
      public void setContext(NodeContext aContext) {
          super.setContext(aContext);
          if ( null != out ) {
              out =
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.DEST_CONNECTOR,
                      name,
                      out);
              requestOut =
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.BODY_TYPE,
                      MsgBody.Type.REQUEST,
                      out);
              resultOut = 
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.BODY_TYPE,
                      MsgBody.Type.RESPONSE,
                      out);
          } else {
              requestOut = null;
              resultOut = null;
          }
      }
  
      protected void handleRequest(Msg aMsg) {
          MsgBody body = aMsg.getBody();
          MsgHeader header = aMsg.getHeader();
          Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
          Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
          CommandRequest command;
          String gateway;
          command = (CommandRequest) body.getContent();
          command.setTarget(this);
          CommandResult result = command.execute();
          Msg msg = new Msg();
          body = msg.getBody();
          body.setContent(result);
          MsgOutInterceptor reqOut =
              new HeaderOutInterceptor(
                  MsgHeaderConstants.CORRELATION_ID,
                  id,
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.DEST_NODES,
                      targetNodes,
                      new HeaderOutInterceptor(
                          MsgHeaderConstants.DEST_CONNECTOR,
                          name,
                          resultOut)));
          reqOut.push(msg);
      }
  
      /**
       * ReplicantCapable identifier. 
       */
      private static class ReplicantID implements Externalizable {
          private static volatile int seqId = 0;
          private int id;
          public ReplicantID() {
              id = seqId++;
          }
          public int hashCode() {
              return id;
          }
          public boolean equals(Object obj) {
              if ( false == obj instanceof ReplicantID ) {
                 return false;
              }
              ReplicantID replicantID = (ReplicantID) obj;
              return id == replicantID.id;
          }
          public void writeExternal(ObjectOutput out) throws IOException {
              out.writeInt(id);
          }
          public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
              id = in.readInt();
          }
      }
  
      public static final GBeanInfo GBEAN_INFO;
  
      static {
          GBeanInfoFactory infoFactory = new GBeanInfoFactory("Replication Member", ReplicationMemberImpl.class.getName(), AbstractConnector.GBEAN_INFO);
          infoFactory.addAttribute(new GAttributeInfo("TargetNodes", true));
          infoFactory.addOperation("registerReplicantCapable", new Class[] {ReplicationCapable.class});
          infoFactory.addOperation("retrieveReplicantCapable", new Class[] {Object.class});
          infoFactory.setConstructor(new GConstructorInfo(
              Arrays.asList(new Object[]{"Node", "Name", "TargetNodes"}),
              Arrays.asList(new Object[]{Node.class, String.class, NodeInfo[].class})));
          GBEAN_INFO = infoFactory.getBeanInfo();
      }
  
      public static GBeanInfo getGBeanInfo() {
          return GBEAN_INFO;
      }
      
  }
  
  
  
  1.3       +38 -32    incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/DummyConnector.java
  
  Index: DummyConnector.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/DummyConnector.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- DummyConnector.java	18 Mar 2004 12:14:05 -0000	1.2
  +++ DummyConnector.java	24 Mar 2004 11:37:06 -0000	1.3
  @@ -20,20 +20,31 @@
   import java.util.ArrayList;
   import java.util.List;
   
  +import org.apache.geronimo.gbean.GAttributeInfo;
  +import org.apache.geronimo.gbean.GBean;
  +import org.apache.geronimo.gbean.GBeanInfo;
  +import org.apache.geronimo.gbean.GBeanInfoFactory;
  +
   /**
    *
    * @version $Revision$ $Date$
    */
  -public class DummyConnector implements Connector {
  +public class DummyConnector
  +    extends AbstractConnector
  +    implements Connector, GBean {
   
       private final String name;
       private final List received;
       private final NodeInfo[] targetNodes;
  -    private RequestSender sender;
  -    private MsgOutInterceptor out;
  -    protected ServerNodeContext serverNodeContext;
       
  -    public DummyConnector(String aName, NodeInfo[] aTargetNodes) {
  +    public DummyConnector(Node aNode,
  +        String aName, NodeInfo[] aTargetNodes) {
  +        super(aNode);
  +        if ( null == aName ) {
  +            throw new IllegalArgumentException("Name is required.");
  +        } else if ( null == aTargetNodes ) {
  +            throw new IllegalArgumentException("Target nodes is required.");
  +        }
           name = aName;
           targetNodes = aTargetNodes;
           received = new ArrayList();
  @@ -55,10 +66,8 @@
                   out), targetNodes);
       }
   
  -    public void setContext(ServerNodeContext aContext) {
  -        serverNodeContext = aContext;
  -        sender = aContext.getRequestSender();
  -        out = aContext.getOutput();
  +    public void setContext(NodeContext aContext) {
  +        super.setContext(aContext);
           if ( null != out ) {
               out = 
                   new HeaderOutInterceptor(
  @@ -72,23 +81,12 @@
           return received;
       }
       
  -    public void deliver(Msg aMsg) {
  -        MsgHeader header = aMsg.getHeader();
  -        MsgBody.Type bodyType =
  -            (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
  -        if ( MsgBody.Type.REQUEST == bodyType ) {
  -            handleRequest(aMsg);
  -        } else if ( MsgBody.Type.RESPONSE == bodyType ) {
  -            handleResponse(aMsg);
  -        }
  -    }
  -    
  -    public void handleRequest(Msg aMsg) {
  +    protected void handleRequest(Msg aMsg) {
           MsgHeader header = aMsg.getHeader();
           MsgBody body = aMsg.getBody();
           
           Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
  -        Object node = header.getHeader(MsgHeaderConstants.SRC_NODE);
  +        Object srcNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
   
           received.add(body.getContent());
           
  @@ -104,7 +102,7 @@
                       MsgBody.Type.RESPONSE,
                       new HeaderOutInterceptor(
                           MsgHeaderConstants.DEST_NODES,
  -                        node,
  +                        srcNode,
                           new HeaderOutInterceptor(
                               MsgHeaderConstants.BODY_TYPE,
                               MsgBody.Type.RESPONSE,
  @@ -112,14 +110,22 @@
           reqOut.push(msg);
       }
   
  -    private void handleResponse(Msg aMsg) {
  -        MsgBody body = aMsg.getBody();
  -        MsgHeader header = aMsg.getHeader();
  -        CommandResult result;
  -        result = (CommandResult) body.getContent();
  -        sender.setResponse(
  -            header.getHeader(MsgHeaderConstants.CORRELATION_ID),
  -            result);
  +    public static final GBeanInfo GBEAN_INFO;
  +
  +    static {
  +        GBeanInfoFactory factory = new GBeanInfoFactory(DummyConnector.class, AbstractConnector.GBEAN_INFO);
  +        factory.setConstructor(
  +            new String[] {"Node", "Name", "TargetNodes"},
  +            new Class[] {Node.class, String.class, NodeInfo[].class});
  +        factory.addAttribute(new GAttributeInfo("TargetNodes", true));
  +        factory.addAttribute(new GAttributeInfo("Received", false));
  +        factory.addOperation("raiseISException");
  +        factory.addOperation("sendRawObject", new Class[]{Object.class});
  +        GBEAN_INFO = factory.getBeanInfo();
  +    }
  +
  +    public static GBeanInfo getGBeanInfo() {
  +        return GBEAN_INFO;
       }
       
   }
  
  
  
  1.2       +7 -2      incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/CommandRequestTest.java
  
  Index: CommandRequestTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/CommandRequestTest.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- CommandRequestTest.java	11 Mar 2004 15:36:14 -0000	1.1
  +++ CommandRequestTest.java	24 Mar 2004 11:37:06 -0000	1.2
  @@ -17,6 +17,8 @@
   
   package org.apache.geronimo.datastore.impl.remote.messaging;
   
  +import java.net.InetAddress;
  +
   import junit.framework.TestCase;
   
   /**
  @@ -29,7 +31,10 @@
       private static final String name = "test"; 
       
       protected void setUp() throws Exception {
  -        connector = new DummyConnector(name, new NodeInfo[0]);
  +        connector = new DummyConnector(
  +            new NodeImpl(
  +                new NodeInfo("test", InetAddress.getLocalHost(), 4040)),
  +            name, new NodeInfo[0]);
       }
       
       public void testExecute0() throws Exception {
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/NodeTest.java
  
  Index: NodeTest.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.messaging;
  
  import java.io.IOException;
  import java.io.InputStream;
  import java.net.InetAddress;
  import java.util.Collections;
  import java.util.List;
  
  import javax.management.ObjectName;
  
  import junit.framework.TestCase;
  
  import org.apache.geronimo.datastore.impl.remote.messaging.Topology.NodePath;
  import org.apache.geronimo.datastore.impl.remote.messaging.Topology.PathWeight;
  import org.apache.geronimo.gbean.jmx.GBeanMBean;
  import org.apache.geronimo.kernel.Kernel;
  
  /**
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:06 $
   */
  public class NodeTest
      extends TestCase
  {
  
      private Kernel kernel1;
      private ObjectName node1Name;
      private ObjectName dummy1Node1Name;
      private DummyConnector dummy1Node1;
      private ObjectName dummy11Node1Name;
      private DummyConnector dummy11Node1;
      
      private Kernel kernel2;
      private ObjectName node2Name;
      private ObjectName dummy1Node2Name;
      private DummyConnector dummy1Node2;
      private ObjectName dummy11Node2Name;
      private DummyConnector dummy11Node2;
      
      private Kernel kernel3;
      private ObjectName node3Name;
  
      private Kernel kernel4;
      private ObjectName node4Name;
      private ObjectName dummy1Node4Name;
      private DummyConnector dummy1Node4;
      
      private void loadAndStart(Kernel kernel, ObjectName name, GBeanMBean instance)
          throws Exception {
          kernel.loadGBean(name, instance);
          kernel.startGBean(name);
      }
      
      private void unloadAndStop(Kernel kernel, ObjectName name)
          throws Exception {
          kernel.stopGBean(name);
          kernel.unloadGBean(name);
      }
      
      protected void setUp() throws Exception {
          InetAddress address = InetAddress.getLocalHost();
          NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8081);
          NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
          NodeInfo nodeInfo3 = new NodeInfo("Node3", address, 8083);
          NodeInfo nodeInfo4 = new NodeInfo("Node4", address, 8084);
  
          // Set-up the first ServerNode.
          kernel1 = new Kernel("test.kernel1", "test");
          kernel1.boot();
  
          node1Name = new ObjectName("geronimo.test:role=node1");
          GBeanMBean node1 = new GBeanMBean(NodeImpl.GBEAN_INFO);
          node1.setAttribute("NodeInfo", nodeInfo1);
          dummy1Node1Name = new ObjectName("geronimo.test:name=dummy1");
          GBeanMBean dummy1Node1GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
          dummy1Node1GB.setReferencePatterns("Node",
              Collections.singleton(node1Name));
          dummy1Node1GB.setAttribute("Name", "dummy1");
          dummy1Node1GB.setAttribute("TargetNodes",
              new NodeInfo[] {nodeInfo2, nodeInfo4});
          dummy11Node1Name = new ObjectName("geronimo.test:name=dummy11");
          GBeanMBean dummy11Node1GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
          dummy11Node1GB.setReferencePatterns("Node",
              Collections.singleton(node1Name));
          dummy11Node1GB.setAttribute("Name", "dummy11");
          dummy11Node1GB.setAttribute("TargetNodes", new NodeInfo[] {nodeInfo2});
          loadAndStart(kernel1, node1Name, node1);
          loadAndStart(kernel1, dummy1Node1Name, dummy1Node1GB);
          loadAndStart(kernel1, dummy11Node1Name, dummy11Node1GB);
          dummy1Node1 = (DummyConnector) dummy1Node1GB.getTarget();
          dummy11Node1 = (DummyConnector) dummy11Node1GB.getTarget();
          
          // Set-up the second ServerNode.
          kernel2 = new Kernel("test.kernel2", "test");
          kernel2.boot();
  
          node2Name = new ObjectName("geronimo.test:role=node2");
          GBeanMBean node2 = new GBeanMBean(NodeImpl.GBEAN_INFO);
          node2.setAttribute("NodeInfo", nodeInfo2);
          dummy1Node2Name = new ObjectName("geronimo.test:name=dummy1");
          GBeanMBean dummy1Node2GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
          dummy1Node2GB.setReferencePatterns("Node",
              Collections.singleton(node2Name));
          dummy1Node2GB.setAttribute("Name", "dummy1");
          dummy1Node2GB.setAttribute("TargetNodes",
                  new NodeInfo[] {nodeInfo1, nodeInfo4});
          dummy11Node2Name = new ObjectName("geronimo.test:name=dummy11");
          GBeanMBean dummy11Node2GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
          dummy11Node2GB.setReferencePatterns("Node",
              Collections.singleton(node2Name));
          dummy11Node2GB.setAttribute("Name", "dummy11");
          dummy11Node2GB.setAttribute("TargetNodes", new NodeInfo[] {nodeInfo1});
          loadAndStart(kernel2, node2Name, node2);
          loadAndStart(kernel2, dummy1Node2Name, dummy1Node2GB);
          loadAndStart(kernel2, dummy11Node2Name, dummy11Node2GB);
          dummy1Node2 = (DummyConnector) dummy1Node2GB.getTarget();
          dummy11Node2 = (DummyConnector) dummy11Node2GB.getTarget();
  
          Node node = (Node) node2.getTarget();
          // The second ServerNode joins the first one.
          node.join(nodeInfo1);
  
          // Set-up the third ServerNode.
          kernel3 = new Kernel("test.kernel3", "test");
          kernel3.boot();
          
          node3Name = new ObjectName("geronimo.test:role=node3");
          GBeanMBean node3 = new GBeanMBean(NodeImpl.GBEAN_INFO);
          node3.setAttribute("NodeInfo", nodeInfo3);
          loadAndStart(kernel3, node3Name, node3);
  
          node = (NodeImpl) node3.getTarget();
          // The third ServerNode joins the second one.
          node.join(nodeInfo2);
  
          // Set-up the fourth ServerNode.
          kernel4 = new Kernel("test.kernel4", "test");
          kernel4.boot();
          
          node4Name = new ObjectName("geronimo.test:role=node4");
          GBeanMBean node4 = new GBeanMBean(NodeImpl.GBEAN_INFO);
          dummy1Node4Name = new ObjectName("geronimo.test:name=dummy1");
          GBeanMBean dummy1Node4GB = new GBeanMBean(DummyConnector.GBEAN_INFO);
          dummy1Node4GB.setReferencePatterns("Node",
              Collections.singleton(node4Name));
          dummy1Node4GB.setAttribute("Name", "dummy1");
          dummy1Node4GB.setAttribute("TargetNodes",
                  new NodeInfo[] {nodeInfo1, nodeInfo2});
          node4.setAttribute("NodeInfo", nodeInfo4);
          loadAndStart(kernel4, node4Name, node4);
          loadAndStart(kernel4, dummy1Node4Name, dummy1Node4GB);
          dummy1Node4 = (DummyConnector) dummy1Node4GB.getTarget();
  
          node = (NodeImpl) node4.getTarget();
          // The fourth ServerNode joins the third one.
          node.join(nodeInfo3);
  
          // Sets the topology.
          Topology topology = new Topology();
          PathWeight weight = new PathWeight(10);
          NodePath path = new NodePath(nodeInfo1, nodeInfo2, weight, weight);
          topology.addPath(path);
          path = new NodePath(nodeInfo2, nodeInfo3, weight, weight);
          topology.addPath(path);
          path = new NodePath(nodeInfo3, nodeInfo4, weight, weight);
          topology.addPath(path);
          
          kernel1.setAttribute(node1Name, "Topology", topology);
          kernel2.setAttribute(node2Name, "Topology", topology);
          kernel3.setAttribute(node3Name, "Topology", topology);
          kernel4.setAttribute(node4Name, "Topology", topology);
      }
  
      protected void tearDown() throws Exception {
          unloadAndStop(kernel1, node1Name);
          unloadAndStop(kernel1, dummy1Node1Name);
          unloadAndStop(kernel1, dummy11Node1Name);
          kernel1.shutdown();
          
          unloadAndStop(kernel2, node2Name);
          unloadAndStop(kernel2, dummy1Node2Name);
          unloadAndStop(kernel2, dummy11Node2Name);
          kernel2.shutdown();
          
          unloadAndStop(kernel3, node3Name);
          kernel3.shutdown();
          
          unloadAndStop(kernel4, node4Name);
          unloadAndStop(kernel4, dummy1Node4Name);
          kernel4.shutdown();
      }
      
      public void testMulticast() throws Exception {
          dummy1Node1.sendRawObject("Test1");
          List list = dummy1Node2.getReceived();
          assertEquals(1, list.size());
          assertEquals("Test1", list.remove(0));
          list = dummy1Node4.getReceived();
          assertEquals(1, list.size());
          assertEquals("Test1", list.remove(0));
      }
  
      public static void main(String[] args) throws Exception {
          NodeTest test = new NodeTest();
          test.setUp();
          while ( true ) {
              test.testSendRawPerformance();
          }
      }
      
      public void testSendRawPerformance() throws Exception {
          List list = dummy11Node2.getReceived();
          int iter = 1000;
          long start = System.currentTimeMillis();
          for(int i = 0; i < iter; i++) {
              dummy11Node1.sendRawObject(null);
              assertEquals(1, list.size());
              list.remove(0);
          }
          long end = System.currentTimeMillis();
          System.out.println("#calls={" + iter + "}; Time={" + (end-start) + "}");
          // TODO update when compression is implemented.
          assertTrue((end - start) < 3000);
      }
      
      public void testInputStreamPerformance() throws Exception {
          long nbBytes = 1024 * 1024;
          InputStream in = new DummyInputStream(nbBytes);
          long baseLine = timeRead(in);
          
          in = new DummyInputStream(nbBytes);
          dummy11Node1.sendRawObject(in);
          List list = dummy11Node2.getReceived();
          assertEquals(1, list.size());
          in = (InputStream) list.remove(0);
          long time = timeRead(in);
          System.out.println("#bytes={" + nbBytes +
              "}; Baseline={" + baseLine + "}; Time={" + time + "}");
          // TODO update when compression is implemented.
          assertTrue(baseLine * 100 > time);
      }
  
      private long timeRead(InputStream anIn)
          throws Exception {
          int read;
          long start = System.currentTimeMillis();
          while ( -1 != (read = anIn.read() ) ) {}
          return System.currentTimeMillis() - start;
      }
      
      private static class DummyInputStream extends InputStream {
          private final long size;
          private long curPos = 0;
          private DummyInputStream(long aSize) {
              size = aSize;
          }
          public int read() throws IOException {
              if ( curPos++ < size ) {
                  return 1;
              }
              return -1;
          }
      }
      
  }
  
  
  
  1.3       +88 -30    incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication/ReplicationTest.java
  
  Index: ReplicationTest.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication/ReplicationTest.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ReplicationTest.java	11 Mar 2004 15:36:15 -0000	1.2
  +++ ReplicationTest.java	24 Mar 2004 11:37:06 -0000	1.3
  @@ -22,60 +22,118 @@
   import java.util.HashMap;
   import java.util.Map;
   
  +import javax.management.ObjectName;
  +
   import junit.framework.TestCase;
   
  +import org.apache.geronimo.datastore.impl.remote.messaging.Node;
   import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
  +import org.apache.geronimo.datastore.impl.remote.messaging.NodeImpl;
   import org.apache.geronimo.datastore.impl.remote.messaging.Topology;
  -import org.apache.geronimo.datastore.impl.remote.messaging.ServerNode;
   import org.apache.geronimo.datastore.impl.remote.messaging.Topology.NodePath;
   import org.apache.geronimo.datastore.impl.remote.messaging.Topology.PathWeight;
  +import org.apache.geronimo.gbean.jmx.GBeanMBean;
  +import org.apache.geronimo.kernel.Kernel;
   
   /**
    *
    * @version $Revision$ $Date$
    */
   public class ReplicationTest extends TestCase {
  -
  -    SimpleReplicatedMap replicant1;
  -    ReplicationMember replication1;
  -    ReplicationMember replication2;
       
  +    private Kernel kernel1;
  +    private ObjectName node1Name;
  +    private ObjectName repNode1Name;
  +    private ReplicationMember repNode1; 
  +    
  +    private Kernel kernel2;
  +    private ObjectName node2Name;
  +    private ObjectName repNode2Name;
  +    private ReplicationMember repNode2; 
  +    
  +    private void loadAndStart(Kernel kernel, ObjectName name, GBeanMBean instance)
  +        throws Exception {
  +        kernel.loadGBean(name, instance);
  +        kernel.startGBean(name);
  +    }
  +    
  +    private void unloadAndStop(Kernel kernel, ObjectName name)
  +        throws Exception {
  +        kernel.stopGBean(name);
  +        kernel.unloadGBean(name);
  +    }
  +
       protected void setUp() throws Exception {
           InetAddress address = InetAddress.getLocalHost();
  -        NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8080);
  -        NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
  +        NodeInfo primaryNode = new NodeInfo("Primary", address, 8080);
  +        NodeInfo secondaryNode = new NodeInfo("Secondary", address, 8082);
           
  -        replicant1 = new SimpleReplicatedMap();
  -        replication1 = new ReplicationMember("Replication1",
  -            new NodeInfo[] {nodeInfo2});
  -        ServerNode server1 = new ServerNode(nodeInfo1,
  -            Collections.singleton(replication1));
  -        server1.doStart();
  -        replication1.doStart();
  -
  -        replication2 = new ReplicationMember("Replication1",
  -            new NodeInfo[] {nodeInfo1});
  -        ServerNode server2 = new ServerNode(nodeInfo2,
  -            Collections.singleton(replication2));
  -        server2.doStart();
  -        replication2.doStart();
  +
  +        // Set-up the first ServerNode.
  +        kernel1 = new Kernel("test.kernel1", "test");
  +        kernel1.boot();
  +
  +        node1Name = new ObjectName("geronimo.test:role=node1");
  +        GBeanMBean node1GB = new GBeanMBean(NodeImpl.GBEAN_INFO);
  +        node1GB.setAttribute("NodeInfo", primaryNode);
  +        repNode1Name = new ObjectName("geronimo.test:role=replication");
  +        GBeanMBean repNode1GB = new GBeanMBean(ReplicationMemberImpl.GBEAN_INFO);
  +        repNode1GB.setReferencePatterns("Node",
  +            Collections.singleton(node1Name));
  +        repNode1GB.setAttribute("Name", "Replication");
  +        repNode1GB.setAttribute("TargetNodes", new NodeInfo[] {secondaryNode});
  +        loadAndStart(kernel1, repNode1Name, repNode1GB);
  +        loadAndStart(kernel1, node1Name, node1GB);
  +        repNode1 = (ReplicationMember) repNode1GB.getTarget();
  +        
  +        // Set-up the second ServerNode.
  +        kernel2 = new Kernel("test.kernel2", "test");
  +        kernel2.boot();
  +        
  +        node2Name = new ObjectName("geronimo.test:role=node2");
  +        GBeanMBean node2GB = new GBeanMBean(NodeImpl.GBEAN_INFO);
  +        node2GB.setAttribute("NodeInfo", secondaryNode);
  +        repNode2Name = new ObjectName("geronimo.test:role=replication");
  +        GBeanMBean repNode2GB = new GBeanMBean(ReplicationMemberImpl.GBEAN_INFO);
  +        repNode2GB.setReferencePatterns("Node",
  +            Collections.singleton(node2Name));
  +        repNode2GB.setAttribute("Name", "Replication");
  +        repNode2GB.setAttribute("TargetNodes", new NodeInfo[] {primaryNode});
  +        loadAndStart(kernel2, repNode2Name, repNode2GB);
  +        loadAndStart(kernel2, node2Name, node2GB);
  +        repNode2 = (ReplicationMember) repNode2GB.getTarget();
           
  -        server2.join(nodeInfo1);
  +        Node node = (Node) node2GB.getTarget();
  +        // The second ServerNode joins the first one.
  +        node.join(primaryNode);
           
  +        // Sets the topology.
           Topology topology = new Topology();
           PathWeight weight = new PathWeight(10);
  -        NodePath path = new NodePath(nodeInfo1, nodeInfo2, weight, weight);
  +        NodePath path = new NodePath(primaryNode, secondaryNode, weight, weight);
           topology.addPath(path);
  -        server2.setTopology(topology);
  -        server1.setTopology(topology);
  +
  +        kernel1.setAttribute(node1Name, "Topology", topology);
  +        kernel2.setAttribute(node2Name, "Topology", topology);
       }
   
  -    public void testUseCase() {
  +    protected void tearDown() throws Exception {
  +        unloadAndStop(kernel1, repNode1Name);
  +        unloadAndStop(kernel1, node1Name);
  +        kernel1.shutdown();
  +
  +        unloadAndStop(kernel2, repNode2Name);
  +        unloadAndStop(kernel2, node2Name);
  +        kernel2.shutdown();
  +    }
  +    
  +    public void testUseCase() throws Exception {
  +        SimpleReplicatedMap replicant1 = new SimpleReplicatedMap();
           replicant1.put("test1", "value1");
  -        replication1.registerReplicantCapable(replicant1);
  +        repNode1.registerReplicantCapable(replicant1);
           Object id = replicant1.getID();
  -        SimpleReplicatedMap replicant2 =
  -            (SimpleReplicatedMap) replication2.retrieveReplicantCapable(id);
  +        SimpleReplicatedMap replicant2 = (SimpleReplicatedMap)
  +            repNode2.retrieveReplicantCapable(id);
           assertNotNull("Not been registered", replicant2);
           assertEquals("value1", replicant2.get("test1"));
           replicant1.put("test2", "value2");
  
  
  

Mime
View raw message