activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1087330 - in /activemq/trunk: activemq-camel/src/main/java/org/apache/activemq/camel/ activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/...
Date Thu, 31 Mar 2011 15:11:10 GMT
Author: gtully
Date: Thu Mar 31 15:11:09 2011
New Revision: 1087330

URL: http://svn.apache.org/viewvc?rev=1087330&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations in a network
without advisories. Allow the connection id generator prefix to be set via a connection factory
such that temp identies can be configured such that they are suitable for inclusion in a network
list of statically included destintions. Allow auto recreation of temp destinations by a new
producer and tie lifecycle to the producers connection. This allows configurable support for
request reply with temps in a network with advisory support disabled

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
  (with props)
Modified:
    activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
    activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java

Modified: activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
(original)
+++ activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnection.java
Thu Mar 31 15:11:09 2011
@@ -30,8 +30,9 @@ public class CamelConnection extends Act
 
     private CamelContext camelContext;
 
-    protected CamelConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl
factoryStats) throws Exception {
-        super(transport, clientIdGenerator, factoryStats);
+    protected CamelConnection(Transport transport, IdGenerator clientIdGenerator,
+                              IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats)
throws Exception {
+        super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
     }
 
     public CamelContext getCamelContext() {

Modified: activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
(original)
+++ activemq/trunk/activemq-camel/src/main/java/org/apache/activemq/camel/CamelConnectionFactory.java
Thu Mar 31 15:11:09 2011
@@ -45,7 +45,7 @@ public class CamelConnectionFactory exte
     // Implementation methods
     //-----------------------------------------------------------------------
     protected CamelConnection createActiveMQConnection(Transport transport, JMSStatsImpl
stats) throws Exception {
-        CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(),
stats);
+        CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(),
getConnectionIdGenerator(), stats);
         CamelContext context = getCamelContext();
         if (context != null) {
             connection.setCamelContext(context);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Mar 31 15:11:09 2011
@@ -200,7 +200,7 @@ public class ActiveMQConnection implemen
      * @param factoryStats
      * @throws Exception
      */
-    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator,
JMSStatsImpl factoryStats) throws Exception {
+    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator,
IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
 
         this.transport = transport;
         this.clientIdGenerator = clientIdGenerator;
@@ -216,7 +216,7 @@ public class ActiveMQConnection implemen
             }
         });
         // asyncConnectionThread.allowCoreThreadTimeOut(true);
-        String uniqueId = CONNECTION_ID_GENERATOR.generateId();
+        String uniqueId = connectionIdGenerator.generateId();
         this.info = new ConnectionInfo(new ConnectionId(uniqueId));
         this.info.setManageable(true);
         this.info.setFaultTolerant(transport.isFaultTolerant());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Thu Mar 31 15:11:09 2011
@@ -83,6 +83,8 @@ public class ActiveMQConnectionFactory e
 
     private IdGenerator clientIdGenerator;
     private String clientIDPrefix;
+    private IdGenerator connectionIdGenerator;
+    private String connectionIDPrefix;
 
     // client policies
     private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
@@ -288,7 +290,8 @@ public class ActiveMQConnectionFactory e
     }
 
     protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl
stats) throws Exception {
-        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
stats);
+        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
+                getConnectionIdGenerator(), stats);
         return connection;
     }
 
@@ -844,6 +847,29 @@ public class ActiveMQConnectionFactory e
     }
 
     /**
+     * Sets the prefix used by connection id generator
+     * @param connectionIDPrefix
+     */
+    public void setConnectionIDPrefix(String connectionIDPrefix) {
+        this.connectionIDPrefix = connectionIDPrefix;
+    }
+
+    protected synchronized IdGenerator getConnectionIdGenerator() {
+        if (connectionIdGenerator == null) {
+            if (connectionIDPrefix != null) {
+                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
+            } else {
+                connectionIdGenerator = new IdGenerator();
+            }
+        }
+        return connectionIdGenerator;
+    }
+
+    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
+        this.connectionIdGenerator = connectionIdGenerator;
+    }
+
+    /**
      * @return the statsEnabled
      */
     public boolean isStatsEnabled() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
Thu Mar 31 15:11:09 2011
@@ -50,8 +50,9 @@ import org.apache.activemq.util.IdGenera
  */
 public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection,
XAQueueConnection, XAConnection {
 
-    protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl
factoryStats) throws Exception {
-        super(transport, clientIdGenerator, factoryStats);
+    protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator,
+                                   IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats)
throws Exception {
+        super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
     }
 
     public XASession createXASession() throws JMSException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
Thu Mar 31 15:11:09 2011
@@ -80,7 +80,7 @@ public class ActiveMQXAConnectionFactory
     }
 
     protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl
stats) throws Exception {
-        ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(),
stats);
+        ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(),
getConnectionIdGenerator(), stats);
         return connection;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Mar 31 15:11:09 2011
@@ -297,10 +297,6 @@ public class BrokerService implements Se
      * @throws Exception
      */
     public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
-        if (!isAdvisorySupport()) {
-            throw new javax.jms.IllegalStateException(
-                    "Networks require advisory messages to function - advisories are currently
disabled");
-        }
         NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
         return addNetworkConnector(connector);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Thu Mar 31 15:11:09 2011
@@ -28,10 +28,12 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.broker.ConsumerBrokerExchange;
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
@@ -130,6 +132,18 @@ public abstract class AbstractRegion imp
                     destinations.put(destination, dest);
                     destinationMap.put(destination, dest);
                     addSubscriptionsForDestination(context, dest);
+                    if (destination.isTemporary()) {
+                        // need to associate with the connection so it can get removed
+                        if (context.getConnection() instanceof TransportConnection) {
+                            TransportConnection transportConnection = (TransportConnection)
context.getConnection();
+                            DestinationInfo info = new DestinationInfo(context.getConnectionId(),
+                                    DestinationInfo.ADD_OPERATION_TYPE,
+                                    destination);
+                            transportConnection.processAddDestination(info);
+                            LOG.debug("assigning ownership of auto created temp : " + destination
+ " to connection:"
+                                    + context.getConnectionId());
+                        }
+                    }
                 }
                 if (dest == null) {
                     throw new JMSException("The destination " + destination + " does not
exist.");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Mar 31 15:11:09 2011
@@ -834,7 +834,7 @@ public class Queue extends BaseDestinati
         }finally {
             messagesLock.readLock().unlock();
         }
-        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions="
+ consumers.size()
+        return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
                 + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in
flight groups="
                 + messageGroupOwners;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Thu Mar 31 15:11:09 2011
@@ -393,7 +393,7 @@ public class RegionBroker extends EmptyB
 
                 // This seems to cause the destination to be added but without
                 // advisories firing...
-                context.getBroker().addDestination(context, destination, false);
+                context.getBroker().addDestination(context, destination, true);
                 switch (destination.getDestinationType()) {
                 case ActiveMQDestination.QUEUE_TYPE:
                     queueRegion.addProducer(context, info);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
Thu Mar 31 15:11:09 2011
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
 /**
+ * @org.apache.xbean.XBean element="tempQueue" description="An ActiveMQ Temporary Queue Destination"
  * @openwire:marshaller code="102"
  * 
  */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
Thu Mar 31 15:11:09 2011
@@ -20,6 +20,7 @@ import javax.jms.JMSException;
 import javax.jms.TemporaryTopic;
 
 /**
+ * @org.apache.xbean.XBean element="tempTopic" description="An ActiveMQ Temporary Topic Destination"
  * @openwire:marshaller code="103"
  * 
  */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Thu Mar 31 15:11:09 2011
@@ -127,19 +127,21 @@ public class SimpleDiscoveryAgent implem
                         event.connectFailures++;
 
                         if (maxReconnectAttempts > 0 && event.connectFailures
>= maxReconnectAttempts) {
-                            LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+"
tries.  Reconnecting has been disabled.");
+                            LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+"
tries.  Reconnecting has been disabled.");
                             return;
                         }
 
                         synchronized (sleepMutex) {
                             try {
                                 if (!running.get()) {
+                                    LOG.debug("Reconnecting disabled: stopped");
                                     return;
                                 }
 
                                 LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting
to reconnect.");
                                 sleepMutex.wait(event.reconnectDelay);
                             } catch (InterruptedException ie) {
+                                LOG.debug("Reconnecting disabled: " + ie);
                                 Thread.currentThread().interrupt();
                                 return;
                             }
@@ -161,6 +163,7 @@ public class SimpleDiscoveryAgent implem
                     }
 
                     if (!running.get()) {
+                        LOG.debug("Reconnecting disabled: stopped");
                         return;
                     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1087330&r1=1087329&r2=1087330&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Thu Mar 31 15:11:09 2011
@@ -52,11 +52,14 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.MessageIdList;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.core.io.Resource;
 
 /**
@@ -66,6 +69,7 @@ import org.springframework.core.io.Resou
  * 
  */
 public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsMultipleBrokersTestSupport.class);
     public static final String AUTO_ASSIGN_TRANSPORT = "tcp://localhost:0";
     public static int maxSetupTime = 5000;
 
@@ -170,7 +174,14 @@ public class JmsMultipleBrokersTestSuppo
         if (!broker.getNetworkConnectors().isEmpty()) {
             result = Wait.waitFor(new Wait.Condition() {
                 public boolean isSatisified() throws Exception {
-                    return (broker.getNetworkConnectors().get(bridgeIndex).activeBridges().size()
>= min);
+                    int activeCount = 0;
+                    for (NetworkBridge bridge : broker.getNetworkConnectors().get(bridgeIndex).activeBridges())
{
+                        if (bridge.getRemoteBrokerName() != null) {
+                            LOG.info("found bridge to " + bridge.getRemoteBrokerName() +
" on broker :" + broker.getBrokerName());
+                            activeCount++;
+                        }
+                    }
+                    return activeCount >= min;
                 }}, wait);
         }
         return result;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1087330&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
Thu Mar 31 15:11:09 2011
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.net.URLStreamHandlerFactory;
+import java.util.Map;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RequestReplyNoAdvisoryNetworkTest extends JmsMultipleBrokersTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class);
+
+    BrokerService a, b;
+    ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
+    static final String connectionIdMarker = "ID:marker.";
+    ActiveMQTempQueue replyQWildcard = new ActiveMQTempQueue(connectionIdMarker + ">");
+    private long receiveTimeout = 30000;
+
+    public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception {
+        final String xmlConfigString = new String(
+                "<beans" +
+                " xmlns=\"http://www.springframework.org/schema/beans\"" +
+                " xmlns:amq=\"http://activemq.apache.org/schema/core\"" +
+                " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"" +
+                " xsi:schemaLocation=\"http://www.springframework.org/schema/beans" +
+                " http://www.springframework.org/schema/beans/spring-beans-2.0.xsd" +
+                " http://activemq.apache.org/schema/core" +
+                " http://activemq.apache.org/schema/core/activemq-core.xsd\">" +
+                "  <broker xmlns=\"http://activemq.apache.org/schema/core\" id=\"broker\""
+
+                "    brokerName=\"%HOST%\" persistent=\"false\" advisorySupport=\"false\"
useJmx=\"false\" >" +
+                "   <destinationPolicy>" +
+                "    <policyMap>" +
+                "     <policyEntries>" +
+                "      <policyEntry optimizedDispatch=\"true\">"+
+                "       <destination>"+
+                "        <tempQueue physicalName=\"" + replyQWildcard.getPhysicalName()
+ "\"/>" +
+                "       </destination>" +
+                "      </policyEntry>" +
+                "     </policyEntries>" +
+                "    </policyMap>" +
+                "   </destinationPolicy>" +
+                "   <networkConnectors>" +
+                "    <networkConnector uri=\"multicast://default\">" +
+                "     <staticallyIncludedDestinations>" +
+                "      <queue physicalName=\"" + sendQ.getPhysicalName() + "\"/>" +
+                "      <tempQueue physicalName=\"" + replyQWildcard.getPhysicalName()
+ "\"/>" +
+                "     </staticallyIncludedDestinations>" +
+                "    </networkConnector>" +
+                "   </networkConnectors>" +
+                "   <transportConnectors>" +
+                "     <transportConnector uri=\"tcp://0.0.0.0:0\" discoveryUri=\"multicast://default\"
/>" +
+                "   </transportConnectors>" +
+                "  </broker>" +
+                "</beans>");
+        final String localProtocolScheme = "inline";
+        URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() {
+            @Override
+            public URLStreamHandler createURLStreamHandler(String protocol) {
+                if (localProtocolScheme.equalsIgnoreCase(protocol)) {
+                    return new URLStreamHandler() {
+                        @Override
+                        protected URLConnection openConnection(URL u) throws IOException
{
+                            return new URLConnection(u) {
+                                @Override
+                                public void connect() throws IOException {
+                                }
+                                @Override
+                                public InputStream getInputStream() throws IOException {
+                                    return new ByteArrayInputStream(xmlConfigString.replace("%HOST%",
url.getFile()).getBytes("UTF-8"));
+                                }
+                            };
+                        }
+                    };
+                }
+                return null;
+            }
+        });
+        a = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme
+ ":A"));
+        b = new XBeanBrokerFactory().createBroker(new URI("xbean:" + localProtocolScheme
+ ":B"));
+
+        doTestNonAdvisoryNetworkRequestReply();
+    }
+
+    public void testNonAdvisoryNetworkRequestReply() throws Exception {
+        createBridgeAndStartBrokers();
+        doTestNonAdvisoryNetworkRequestReply();
+    }
+
+    public void doTestNonAdvisoryNetworkRequestReply() throws Exception {
+
+        waitForBridgeFormation(a, 1, 0);
+        waitForBridgeFormation(b, 1, 0);
+
+        ActiveMQConnectionFactory sendFactory = createConnectionFactory(a);
+        ActiveMQConnection sendConnection = createConnection(sendFactory);
+
+        ActiveMQSession sendSession = (ActiveMQSession)sendConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = sendSession.createProducer(sendQ);
+        ActiveMQTempQueue realReplyQ = (ActiveMQTempQueue) sendSession.createTemporaryQueue();
+        TextMessage message = sendSession.createTextMessage("1");
+        message.setJMSReplyTo(realReplyQ);
+        producer.send(message);
+
+        // responder
+        ActiveMQConnectionFactory consumerFactory = createConnectionFactory(b);
+        ActiveMQConnection consumerConnection = createConnection(consumerFactory);
+
+        ActiveMQSession consumerSession = (ActiveMQSession)consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(sendQ);
+        TextMessage received = (TextMessage) consumer.receive(receiveTimeout);
+        assertNotNull(received);
+
+        LOG.info("got request, sending reply");
+
+        MessageProducer consumerProducer = consumerSession.createProducer(received.getJMSReplyTo());
+        consumerProducer.send(consumerSession.createTextMessage("got " + received.getText()));
+        // temp dest on reply broker tied to this connection, setOptimizedDispatch=true ensures
+        // message gets delivered before destination is removed
+        consumerConnection.close();
+
+        // reply consumer
+        MessageConsumer replyConsumer = sendSession.createConsumer(realReplyQ);
+        TextMessage reply = (TextMessage) replyConsumer.receive(receiveTimeout);
+        assertNotNull("expected reply message", reply);
+        assertEquals("text is as expected", "got 1", reply.getText());
+        sendConnection.close();
+
+        verifyAllTempQueuesAreGone();
+    }
+
+    private void verifyAllTempQueuesAreGone() throws Exception {
+        for (BrokerService brokerService : new BrokerService[]{a, b}) {
+            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
+            Map temps = regionBroker.getTempTopicRegion().getDestinationMap();
+            assertTrue("no temp topics on " + brokerService + ", " + temps, temps.isEmpty());
+            temps = regionBroker.getTempQueueRegion().getDestinationMap();
+            assertTrue("no temp queues on " + brokerService + ", " + temps, temps.isEmpty());
+        }
+    }
+
+    private ActiveMQConnection createConnection(ActiveMQConnectionFactory factory) throws
Exception {
+        ActiveMQConnection c =(ActiveMQConnection) factory.createConnection();
+        c.start();
+        return c;
+    }
+
+    private ActiveMQConnectionFactory createConnectionFactory(BrokerService brokerService)
throws Exception {
+        String target = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+        ActiveMQConnectionFactory factory =
+                new ActiveMQConnectionFactory(target);
+        factory.setWatchTopicAdvisories(false);
+        factory.setConnectionIDPrefix(connectionIdMarker + brokerService.getBrokerName());
+        return factory;
+    }
+
+    public void createBridgeAndStartBrokers() throws Exception {
+        a = configureBroker("A");
+        b = configureBroker("B");
+        bridge(a, b);
+        bridge(b, a);
+        a.start();
+        b.start();
+    }
+
+    public void tearDown() throws Exception {
+        stop(a);
+        stop(b);
+    }
+
+    private void stop(BrokerService broker) throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private void bridge(BrokerService from, BrokerService to) throws Exception {
+        TransportConnector toConnector = to.addConnector("tcp://localhost:0");
+        NetworkConnector bridge =
+                from.addNetworkConnector("static://" + toConnector.getPublishableConnectString());
+        bridge.addStaticallyIncludedDestination(sendQ);
+        bridge.addStaticallyIncludedDestination(replyQWildcard);
+    }
+
+    private BrokerService configureBroker(String brokerName) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+        broker.setAdvisorySupport(false);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry tempReplyQPolicy = new PolicyEntry();
+        tempReplyQPolicy.setOptimizedDispatch(true);
+        map.put(replyQWildcard, tempReplyQPolicy);
+        broker.setDestinationPolicy(map);
+        return broker;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message