activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r358551 - in /incubator/activemq/trunk/activecluster/src: java/org/activecluster/ java/org/activecluster/impl/ test/org/activecluster/ test/org/activecluster/group/
Date Thu, 22 Dec 2005 10:28:16 GMT
Author: rajdavies
Date: Thu Dec 22 02:28:04 2005
New Revision: 358551

URL: http://svn.apache.org/viewcvs?rev=358551&view=rev
Log:
Revert back to javax.jms.Destination instead of Strings and added support
for a DestinationMarshaller

Modified:
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java
    incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java
    incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java
    incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java
    incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java
    incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java
    incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/Cluster.java Thu Dec 22 02:28:04 2005
@@ -7,13 +7,15 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
 package org.activecluster;
 
 import java.io.Serializable;
@@ -46,7 +48,7 @@
      *
      * @return the destination to send messages to all members of the cluster
      */
-    public String getDestination();
+    public Destination getDestination();
 
     /**
      * A snapshot of the nodes in the cluster indexed by the Destination
@@ -94,18 +96,10 @@
      * @param message     the message to be sent
      * @throws JMSException
      */
-    public void send(String destination, Message message) throws JMSException;
+    public void send(Destination destination, Message message) throws JMSException;
 
     
     /**
-     * Utility method for sending back replies in message exchanges
-     *
-     * @param replyTo the replyTo JMS Destination on a Message
-     * @param message     the message to be sent
-     * @throws JMSException
-     */
-    public void send(Destination replyTo, Message message) throws JMSException;
-    /**
      * Creates a consumer of all the messags sent to the given destination,
      * including messages sent via the send() messages
      *
@@ -113,7 +107,7 @@
      * @return a newly  created message consumer
      * @throws JMSException
      */
-    public MessageConsumer createConsumer(String destination) throws JMSException;
+    public MessageConsumer createConsumer(Destination destination) throws JMSException;
 
     /**
      * Creates a consumer of all message sent to the given destination,
@@ -125,7 +119,7 @@
      * @return a newly  created message consumer
      * @throws JMSException
      */
-    public MessageConsumer createConsumer(String destination, String selector) throws JMSException;
+    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException;
 
     /**
      * Creates a consumer of all message sent to the given destination,
@@ -139,7 +133,7 @@
      * @return a newly  created message consumer
      * @throws JMSException
      */
-    public MessageConsumer createConsumer(String destination, String selector, boolean noLocal) throws JMSException;
+    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException;
 
 
     // Message factory methods

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/ClusterFactory.java Thu Dec 22 02:28:04 2005
@@ -7,16 +7,18 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 
 package org.activecluster;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 
 
@@ -30,19 +32,62 @@
     /**
      * Creates a new cluster connection using the given  local name and destination name
      * @param localName 
-     * @param destination 
+     * @param destinationName 
+     *
+     * @return Cluster
+     * @throws JMSException
+     */
+    public Cluster createCluster(String localName,String destinationName) throws JMSException;
+    
+    /**
+     * Creates a new cluster connection using the given  local name and destination name
+     * @param localName 
+     * @param destinationName 
+     * @param marshaller 
+     *
+     * @return Cluster
+     * @throws JMSException
+     */
+    public Cluster createCluster(String localName,String destinationName,DestinationMarshaller marshaller) throws JMSException;
+
+
+
+    /**
+     * Creates a new cluster connection - generating the localName automatically
+     * @param destinationName
+     * @return the Cluster
+     * @throws JMSException
+     */
+    public Cluster createCluster(String destinationName) throws  JMSException;
+    
+    /**
+     * Creates a new cluster connection using the given  local name and destination name
+     * @param localName 
+     * @param destination
      *
      * @return Cluster
      * @throws JMSException
      */
-    public Cluster createCluster(String localName,String destination) throws JMSException;
+    public Cluster createCluster(String localName,Destination destination) throws JMSException;
+    
+    /**
+     * Creates a new cluster connection using the given  local name and destination name
+     * @param localName 
+     * @param destination
+     * @param marshaller 
+     *
+     * @return Cluster
+     * @throws JMSException
+     */
+    public Cluster createCluster(String localName,Destination destination, DestinationMarshaller marshaller) throws JMSException;
+
 
 
     /**
      * Creates a new cluster connection - generating the localName automatically
      * @param destination
-     * @return
+     * @return the Cluster
      * @throws JMSException
      */
-    public Cluster createCluster(String destination) throws  JMSException;
+    public Cluster createCluster(Destination destination) throws  JMSException;
 }

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/Node.java Thu Dec 22 02:28:04 2005
@@ -7,17 +7,18 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster;
 
-import java.io.Serializable;
 import java.util.Map;
+import javax.jms.Destination;
 
 
 /**
@@ -25,14 +26,14 @@
  *
  * @version $Revision: 1.3 $
  */
-public interface Node extends Serializable {
+public interface Node {
 
     /**
      * Access to the queue to send messages direct to this node.
      *
      * @return the destination to send messages to this node while its available
      */
-    public String getDestination();
+    public Destination getDestination();
 
     /**
      * @return an immutable map of the nodes state

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultCluster.java Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.impl;
 
 import java.io.Serializable;
@@ -33,6 +34,7 @@
 import javax.jms.TextMessage;
 import org.activecluster.Cluster;
 import org.activecluster.ClusterListener;
+import org.activecluster.DestinationMarshaller;
 import org.activecluster.LocalNode;
 import org.activecluster.Service;
 import org.activecluster.election.ElectionStrategy;
@@ -50,41 +52,53 @@
 
     private StateServiceImpl stateService;
     private LocalNode localNode;
-    private String destination;
+    private Destination destination;
     private Connection connection;
     private Session session;
     private MessageProducer producer;
     private MessageConsumer consumer;
     private Timer timer;
+    private DestinationMarshaller marshaller;
     private AtomicBoolean started = new AtomicBoolean(false);
     private Object clusterLock = new Object();
 
-    public DefaultCluster(final LocalNode localNode,String dataDestination, String destination, Connection connection, Session session,
-                          MessageProducer producer, Timer timer, long inactiveTime) throws JMSException {
-        this.localNode = localNode;
-        this.destination = destination;
-        this.connection = connection;
-        this.session = session;
-        this.producer = producer;
-        this.timer = timer;
-
-        if (producer == null) {
+    /**
+     * Construct this beast
+     * @param localNode
+     * @param dataDestination
+     * @param destination
+     * @param marshaller
+     * @param connection
+     * @param session
+     * @param producer
+     * @param timer
+     * @param inactiveTime
+     * @throws JMSException
+     */
+    public DefaultCluster(final LocalNode localNode,Destination dataDestination,Destination destination,
+                    DestinationMarshaller marshaller,Connection connection,Session session,MessageProducer producer,
+                    Timer timer,long inactiveTime) throws JMSException{
+        this.localNode=localNode;
+        this.destination=destination;
+        this.marshaller=marshaller;
+        this.connection=connection;
+        this.session=session;
+        this.producer=producer;
+        this.timer=timer;
+        if(producer==null){
             throw new IllegalArgumentException("No producer specified!");
         }
-
         // now lets subscribe the service to the updates from the data topic
-        consumer = session.createConsumer(createDestination(dataDestination), null, true);
-
-        log.info("Creating data consumer on topic: " + dataDestination);
-
-        this.stateService = new StateServiceImpl(this, clusterLock, new Runnable() {
-            public void run() {
-                if (localNode instanceof ReplicatedLocalNode) {
+        consumer=session.createConsumer(dataDestination,null,true);
+        log.info("Creating data consumer on topic: "+dataDestination);
+        this.stateService=new StateServiceImpl(this,clusterLock,new Runnable(){
+            public void run(){
+                if(localNode instanceof ReplicatedLocalNode){
                     ((ReplicatedLocalNode) localNode).pingRemoteNodes();
                 }
             }
-        }, timer, inactiveTime);
-        consumer.setMessageListener(new StateConsumer(stateService));
+        },timer,inactiveTime);
+        consumer.setMessageListener(new StateConsumer(stateService,marshaller));
     }
 
     public void addClusterListener(ClusterListener listener) {
@@ -95,7 +109,7 @@
         stateService.removeClusterListener(listener);
     }
 
-    public String getDestination() {
+    public Destination getDestination() {
         return destination;
     }
 
@@ -111,24 +125,21 @@
         stateService.setElectionStrategy(strategy);
     }
 
-    public void send(String destination,Message message) throws JMSException {
-        producer.send(createDestination(destination), message);
-    }
-    
+        
    public void send(Destination replyTo, Message message) throws JMSException{
        producer.send(replyTo,message);
    }
 
-    public MessageConsumer createConsumer(String destination) throws JMSException {
-        return getSession().createConsumer(createDestination(destination));
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        return getSession().createConsumer(destination);
     }
 
-    public MessageConsumer createConsumer(String destination, String selector) throws JMSException {
-        return getSession().createConsumer(createDestination(destination), selector);
+    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
+        return getSession().createConsumer(destination, selector);
     }
 
-    public MessageConsumer createConsumer(String destination, String selector, boolean noLocal) throws JMSException {
-        return getSession().createConsumer(createDestination(destination), selector, noLocal);
+    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
+        return getSession().createConsumer(destination, selector, noLocal);
     }
 
     public Message createMessage() throws JMSException {

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/DefaultClusterFactory.java Thu Dec 22 02:28:04 2005
@@ -20,6 +20,7 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -27,6 +28,7 @@
 import org.activecluster.Cluster;
 import org.activecluster.ClusterException;
 import org.activecluster.ClusterFactory;
+import org.activecluster.DestinationMarshaller;
 import org.activemq.util.IdGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,16 +63,42 @@
         this(connectionFactory, false, Session.AUTO_ACKNOWLEDGE, "ACTIVECLUSTER.DATA.", 6000L);
     }
 
-    public Cluster createCluster(String groupDestination) throws JMSException {
+    public Cluster createCluster(Destination groupDestination) throws JMSException {
         return createCluster(idGenerator.generateId(), groupDestination);
     }
 
-    public Cluster createCluster(String name,String groupDestination) throws  JMSException {
+    public Cluster createCluster(String name,Destination groupDestination) throws  JMSException {
         Connection connection = getConnectionFactory().createConnection();
         Session session = createSession(connection);
-        return createCluster(connection, session, name,groupDestination);
+        return createCluster(connection, session, name,groupDestination,new DefaultDestinationMarshaller());
+    }
+    
+    public Cluster createCluster(String name,Destination groupDestination,DestinationMarshaller marshaller) throws  JMSException {
+        Connection connection = getConnectionFactory().createConnection();
+        Session session = createSession(connection);
+        return createCluster(connection, session, name,groupDestination,marshaller);
+    }
+    
+    
+    public Cluster createCluster(String name,String groupDestinationName) throws JMSException{
+        Connection connection = getConnectionFactory().createConnection();
+        Session session = createSession(connection);
+        return createCluster(connection, session, name,session.createTopic(groupDestinationName),new DefaultDestinationMarshaller());
+    }
+    
+    public Cluster createCluster(String name,String groupDestinationName,DestinationMarshaller marshaller) throws JMSException{
+        Connection connection = getConnectionFactory().createConnection();
+        Session session = createSession(connection);
+        return createCluster(connection, session, name,session.createTopic(groupDestinationName),marshaller);
     }
 
+
+    
+    public Cluster createCluster(String groupDestinationName) throws  JMSException{
+        return createCluster(idGenerator.generateId(), groupDestinationName);
+    }
+    
+
     // Properties
     //-------------------------------------------------------------------------
     public String getDataTopicPrefix() {
@@ -134,34 +162,29 @@
 
     // Implementation methods
     //-------------------------------------------------------------------------
-    protected Cluster createCluster(Connection connection, Session session, String name,String groupDestination) throws JMSException {
-        String dataDestination = dataTopicPrefix + groupDestination;
-
-        log.info("Creating cluster group producer on topic: " + groupDestination);
-
-        MessageProducer producer = createProducer(session, null);
+    protected Cluster createCluster(Connection connection,Session session,String name,Destination groupDestination,
+                    DestinationMarshaller marshaller) throws JMSException{
+        String dataDestination=dataTopicPrefix+groupDestination;
+        log.info("Creating cluster group producer on topic: "+groupDestination);
+        MessageProducer producer=createProducer(session,null);
         producer.setDeliveryMode(deliveryMode);
-
-        log.info("Creating cluster data producer on data destination: " + dataDestination);
-
-        Topic dataTopic = session.createTopic(dataDestination);
-        MessageProducer keepAliveProducer = session.createProducer(dataTopic);
+        log.info("Creating cluster data producer on data destination: "+dataDestination);
+        Topic dataTopic=session.createTopic(dataDestination);
+        MessageProducer keepAliveProducer=session.createProducer(dataTopic);
         keepAliveProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        StateService serviceStub = new StateServiceStub(session, keepAliveProducer);
-
-        String localInboxDestination = dataDestination + "." + name;
-        
-        ReplicatedLocalNode localNode = new ReplicatedLocalNode(name,localInboxDestination, serviceStub);
-        Timer timer = new Timer();
-        DefaultCluster answer = new DefaultCluster(localNode, dataDestination, groupDestination, connection, session, producer, timer, inactiveTime);
+        StateService serviceStub=new StateServiceStub(session,keepAliveProducer,marshaller);
+        Destination localInboxDestination=session.createTopic(dataDestination+"."+name);
+        ReplicatedLocalNode localNode=new ReplicatedLocalNode(name,localInboxDestination,serviceStub);
+        Timer timer=new Timer();
+        DefaultCluster answer=new DefaultCluster(localNode,dataTopic,groupDestination,marshaller,connection,session,
+                        producer,timer,inactiveTime);
         return answer;
     }
 
     /*
-     protected Cluster createInternalCluster(Session session, Topic dataDestination) {
-         MessageProducer producer = createProducer(session);
-         return new DefaultCluster(new NonReplicatedLocalNode(), dataDestination, connection, session, producer);
-     }
+     * protected Cluster createInternalCluster(Session session, Topic dataDestination) { MessageProducer producer =
+     * createProducer(session); return new DefaultCluster(new NonReplicatedLocalNode(), dataDestination, connection,
+     * session, producer); }
      */
     
     protected MessageProducer createProducer(Session session, Topic groupDestination) throws JMSException {

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NodeImpl.java Thu Dec 22 02:28:04 2005
@@ -7,17 +7,23 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.impl;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.HashMap;
 import java.util.Map;
+import javax.jms.Destination;
+import org.activecluster.DestinationMarshaller;
 import org.activecluster.Node;
 
 
@@ -26,13 +32,22 @@
  *
  * @version $Revision: 1.3 $
  */
-public class NodeImpl implements Node {
+public class NodeImpl implements Node{
     private static final long serialVersionUID=-3909792803360045064L;
     private String name;
-    private String destination;
+    private Destination destination;
     protected Map state;
     protected boolean coordinator;
-
+    
+    
+    /**
+     * Construct an Node from a NodeState
+     * @param nodeState
+     * @param marshaller
+     */
+    public NodeImpl(NodeState nodeState,DestinationMarshaller marshaller){
+        this(nodeState.getName(),marshaller.getDestination(nodeState.getDestinationName()),nodeState.getState());
+    }
     /**
      * Allow a node to be copied for sending it as a message
      *
@@ -47,7 +62,7 @@
      * @param name 
      * @param destination
      */
-    public NodeImpl(String name,String destination) {
+    public NodeImpl(String name,Destination destination) {
         this(name,destination, new HashMap());
     }
 
@@ -57,7 +72,7 @@
      * @param destination
      * @param state
      */
-    public NodeImpl(String name,String destination, Map state) {
+    public NodeImpl(String name,Destination destination, Map state) {
         this.name = name;
         this.destination = destination;
         this.state = state;
@@ -80,7 +95,7 @@
     /**
      * @return the destination of the node
      */
-    public String getDestination() {
+    public Destination getDestination() {
         return destination;
     }
 
@@ -117,5 +132,15 @@
 
     protected void setCoordinator(boolean value) {
         coordinator = value;
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException{
+        // TODO Auto-generated method stub
+        
+    }
+
+    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
+        // TODO Auto-generated method stub
+        
     }
 }

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/NonReplicatedLocalNode.java Thu Dec 22 02:28:04 2005
@@ -7,16 +7,18 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.impl;
 
 import java.util.Map;
+import javax.jms.Destination;
 import org.activecluster.LocalNode;
 
 /**
@@ -33,7 +35,7 @@
      * @param name
      * @param destination
      */
-    public NonReplicatedLocalNode(String name, String destination) {
+    public NonReplicatedLocalNode(String name, Destination destination) {
         super(name,destination);
     }
 

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/ReplicatedLocalNode.java Thu Dec 22 02:28:04 2005
@@ -7,16 +7,18 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.impl;
 
 import java.util.Map;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import org.activecluster.LocalNode;
 import org.activecluster.Service;
@@ -34,7 +36,7 @@
      * 
      */
     private static final long serialVersionUID=4626381612145333540L;
-    private StateService serviceStub;
+    private transient StateService serviceStub;
 
     /**
      * Create ReplicatedLocalNode
@@ -42,7 +44,7 @@
      * @param destination
      * @param serviceStub
      */
-    public ReplicatedLocalNode(String name,String destination, StateService serviceStub) {
+    public ReplicatedLocalNode(String name,Destination destination, StateService serviceStub) {
         super(name,destination);
         this.serviceStub = serviceStub;
     }

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateConsumer.java Thu Dec 22 02:28:04 2005
@@ -7,17 +7,19 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.impl;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.activecluster.DestinationMarshaller;
 import org.activecluster.Node;
 
 import javax.jms.Message;
@@ -36,12 +38,14 @@
     private final static Log log = LogFactory.getLog(StateConsumer.class);
 
     private StateService stateService;
+    private DestinationMarshaller marshaller;
 
-    public StateConsumer(StateService stateService) {
+    public StateConsumer(StateService stateService,DestinationMarshaller marshaller) {
         if (stateService == null) {
             throw new IllegalArgumentException("Must specify a valid StateService implementation");
         }
         this.stateService = stateService;
+        this.marshaller = marshaller;
     }
 
     public void onMessage(Message message) {
@@ -52,7 +56,8 @@
         if (message instanceof ObjectMessage) {
             ObjectMessage objectMessage = (ObjectMessage) message;
             try {
-                Node node = (Node) objectMessage.getObject();
+                NodeState nodeState = (NodeState) objectMessage.getObject();
+                Node node = new NodeImpl(nodeState,marshaller);
                 String type = objectMessage.getJMSType();
                 if (type != null && type.equals("shutdown")) {
                     stateService.shutdown(node);

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceImpl.java Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.impl;
 
 import java.util.HashMap;
@@ -23,6 +24,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Map.Entry;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import org.activecluster.Cluster;
 import org.activecluster.ClusterEvent;
@@ -49,7 +51,7 @@
     private Map nodes = new ConcurrentHashMap();
     private long inactiveTime;
     private List listeners =  new CopyOnWriteArrayList();
-    private String localDestination;
+    private Destination localDestination;
     private Runnable localNodePing;
     private NodeImpl coordinator;
     private ElectionStrategy electionStrategy;
@@ -133,7 +135,7 @@
      * @param node 
      */
     public void keepAlive(Node node) {
-        String key = node.getDestination();
+        Object key = node.getDestination();
         if (key != null && !localDestination.equals(key)) {
             NodeEntry entry = (NodeEntry) nodes.get(key);
             if (entry == null) {
@@ -163,7 +165,7 @@
      * shutdown the node
      */
     public void shutdown(Node node){
-        String key=node.getDestination();
+        Object key=node.getDestination();
         if(key!=null){
             nodes.remove(key);
             ClusterEvent event=new ClusterEvent(cluster,node,ClusterEvent.ADD_NODE);

Modified: incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java (original)
+++ incubator/activemq/trunk/activecluster/src/java/org/activecluster/impl/StateServiceStub.java Thu Dec 22 02:28:04 2005
@@ -7,17 +7,19 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.impl;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.activecluster.DestinationMarshaller;
 import org.activecluster.Node;
 
 import javax.jms.JMSException;
@@ -38,10 +40,12 @@
 
     private Session session;
     private MessageProducer producer;
+    private DestinationMarshaller marshaller;
 
-    public StateServiceStub(Session session, MessageProducer producer) {
+    public StateServiceStub(Session session, MessageProducer producer,DestinationMarshaller marshaller) {
         this.session = session;
         this.producer = producer;
+        this.marshaller = marshaller;
     }
 
     public void keepAlive(Node node) {
@@ -50,7 +54,7 @@
                 log.debug("Sending cluster data message: " + node);
             }
 
-            Message message = session.createObjectMessage(new NodeImpl(node));
+            Message message = session.createObjectMessage(new NodeState(node,marshaller));
             producer.send(message);
         }
         catch (JMSException e) {
@@ -64,7 +68,7 @@
                 log.debug("Sending shutdown message: " + node);
             }
 
-            Message message = session.createObjectMessage(new NodeImpl(node));
+            Message message = session.createObjectMessage(new NodeState(node,marshaller));
             message.setJMSType("shutdown");
             producer.send(message);
         }

Modified: incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java (original)
+++ incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterFunctionTest.java Thu Dec 22 02:28:04 2005
@@ -2,17 +2,18 @@
  *
  * Copyright 2004 The Apache Software Foundation
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
+ implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
 package org.activecluster;
@@ -150,7 +151,7 @@
             try {
                 System.out.println("request received");
                 ObjectMessage om = cluster.createObjectMessage();
-                om.setJMSReplyTo(cluster.createDestination(cluster.getLocalNode().getDestination()));
+                om.setJMSReplyTo(cluster.getLocalNode().getDestination());
                 om.setObject(new Response());
                 System.out.println("sending response");
                 cluster.send(om2.getJMSReplyTo(), om);
@@ -192,7 +193,7 @@
         // 1->1 messages
         _cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1);
         ObjectMessage om = _cluster0.createObjectMessage();
-        om.setJMSReplyTo(_cluster0.createDestination(_cluster0.getLocalNode().getDestination()));
+        om.setJMSReplyTo(_cluster0.getLocalNode().getDestination());
         om.setObject(new Request());
         testResponsePassed = false;
         _cluster0.send(_cluster0.getLocalNode().getDestination(), om);

Modified: incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java (original)
+++ incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTest.java Thu Dec 22 02:28:04 2005
@@ -7,17 +7,19 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster;
 
 import java.util.List;
 import java.util.Map;
+import javax.jms.Destination;
 import javax.jms.Message;
 
 /**
@@ -27,14 +29,14 @@
 
     protected int count = 2;
 
-    public void xtestCluster() throws Exception {
+    public void testCluster() throws Exception {
         cluster = createCluster();
 
         subscribeToCluster();
 
         cluster.start();
 
-        String destination = cluster.getDestination();
+        Destination destination = cluster.getDestination();
         Message message = cluster.createTextMessage("abcdef");
         cluster.send(destination, message);
 

Modified: incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java (original)
+++ incubator/activemq/trunk/activecluster/src/test/org/activecluster/ClusterTestSupport.java Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster;
 
 import org.activecluster.impl.ActiveMQClusterFactory;
@@ -49,13 +50,13 @@
     protected void subscribeToCluster() throws Exception {
 
         // listen to cluster messages
-        String clusterDestination = cluster.getDestination();
+        Destination clusterDestination = cluster.getDestination();
         assertTrue("Local destination must not be null", clusterDestination != null);
         clusterConsumer = cluster.createConsumer(clusterDestination);
         clusterConsumer.setMessageListener(clusterListener);
 
         // listen to inbox messages (individual messages)
-        String localDestination = cluster.getLocalNode().getDestination();
+        Destination localDestination = cluster.getLocalNode().getDestination();
         assertTrue("Local destination must not be null", localDestination != null);
 
         System.out.println("Consuming from local destination: " + localDestination);

Modified: incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java (original)
+++ incubator/activemq/trunk/activecluster/src/test/org/activecluster/TestSupport.java Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster;
 
 import junit.framework.TestCase;
@@ -51,7 +52,7 @@
     }
 
     protected Cluster createCluster(String name) throws JMSException, ClusterException {
-        Cluster cluster = createCluster();
-        return cluster;
+        ClusterFactory factory = new ActiveMQClusterFactory();
+        return factory.createCluster(name,"ORG.CODEHAUS.ACTIVEMQ.TEST.CLUSTER");
     }
 }

Modified: incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java?rev=358551&r1=358550&r2=358551&view=diff
==============================================================================
--- incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java (original)
+++ incubator/activemq/trunk/activecluster/src/test/org/activecluster/group/GroupTestSupport.java Thu Dec 22 02:28:04 2005
@@ -7,13 +7,14 @@
  * You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
 package org.activecluster.group;
 
 import java.util.HashMap;
@@ -22,7 +23,9 @@
 import org.activecluster.Cluster;
 import org.activecluster.ClusterEvent;
 import org.activecluster.ClusterListener;
+import org.activecluster.DestinationMarshaller;
 import org.activecluster.Node;
+import org.activecluster.impl.DefaultDestinationMarshaller;
 import org.activecluster.impl.NodeImpl;
 
 /**
@@ -36,6 +39,7 @@
     private ClusterListener listener;
     private Cluster cluster;
     private Map nodes = new HashMap();
+    private DestinationMarshaller marshaller = new DefaultDestinationMarshaller();
 
     protected void addNodes(String[] nodeNames) {
         for (int i = 0; i < nodeNames.length; i++) {
@@ -45,7 +49,8 @@
     }
 
     protected void addNode(String nodeName) {
-        Node node = new NodeImpl(nodeName,nodeName);
+        
+        Node node = new NodeImpl(nodeName,marshaller.getDestination(nodeName));
         nodes.put(nodeName, node);
         listener.onNodeAdd(new ClusterEvent(cluster, node, ClusterEvent.ADD_NODE));
     }



Mime
View raw message