activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1423834 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
Date Wed, 19 Dec 2012 11:41:18 GMT
Author: dejanb
Date: Wed Dec 19 11:41:17 2012
New Revision: 1423834

URL: http://svn.apache.org/viewvc?rev=1423834&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4222 - remove region reference for all mutable producer
exchanges (and some more refactorings)

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1423834&r1=1423833&r2=1423834&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed Dec 19 11:41:17 2012
@@ -125,16 +125,9 @@ public class RegionBroker extends EmptyB
 
     @Override
     public Set <Destination> getDestinations(ActiveMQDestination destination) {
-        switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            return queueRegion.getDestinations(destination);
-        case ActiveMQDestination.TOPIC_TYPE:
-            return topicRegion.getDestinations(destination);
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            return tempQueueRegion.getDestinations(destination);
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            return tempTopicRegion.getDestinations(destination);
-        default:
+        try {
+            return getRegion(destination).getDestinations(destination);
+        } catch (JMSException jmse) {
             return Collections.emptySet();
         }
     }
@@ -278,22 +271,10 @@ public class RegionBroker extends EmptyB
             return answer;
         }
 
-        switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            answer = queueRegion.addDestination(context, destination,true);
-            break;
-        case ActiveMQDestination.TOPIC_TYPE:
-            answer = topicRegion.addDestination(context, destination,true);
-            break;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            answer = tempQueueRegion.addDestination(context, destination, createIfTemp);
-            break;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            answer = tempTopicRegion.addDestination(context, destination, createIfTemp);
-            break;
-        default:
-            throw createUnknownDestinationTypeException(destination);
-        }
+        boolean create = true;
+        if (destination.isTemporary())
+            create = createIfTemp;
+        answer = getRegion(destination).addDestination(context, destination, create);
 
         destinations.put(destination, answer);
         return answer;
@@ -303,28 +284,10 @@ public class RegionBroker extends EmptyB
 
     @Override
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination,
long timeout) throws Exception {
-
         if (destinations.containsKey(destination)) {
-            switch (destination.getDestinationType()) {
-            case ActiveMQDestination.QUEUE_TYPE:
-                queueRegion.removeDestination(context, destination, timeout);
-                break;
-            case ActiveMQDestination.TOPIC_TYPE:
-                topicRegion.removeDestination(context, destination, timeout);
-                break;
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                tempQueueRegion.removeDestination(context, destination, timeout);
-                break;
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                tempTopicRegion.removeDestination(context, destination, timeout);
-                break;
-            default:
-                throw createUnknownDestinationTypeException(destination);
-            }
+            getRegion(destination).removeDestination(context, destination, timeout);
             destinations.remove(destination);
-
         }
-
     }
 
     @Override
@@ -359,20 +322,7 @@ public class RegionBroker extends EmptyB
                 // This seems to cause the destination to be added but without
                 // advisories firing...
                 context.getBroker().addDestination(context, destination, isAllowTempAutoCreationOnSend());
-                switch (destination.getDestinationType()) {
-                case ActiveMQDestination.QUEUE_TYPE:
-                    queueRegion.addProducer(context, info);
-                    break;
-                case ActiveMQDestination.TOPIC_TYPE:
-                    topicRegion.addProducer(context, info);
-                    break;
-                case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                    tempQueueRegion.addProducer(context, info);
-                    break;
-                case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                    tempTopicRegion.addProducer(context, info);
-                    break;
-                }
+                getRegion(destination).addProducer(context, info);
             } finally {
                 inactiveDestinationsPurgeLock.readLock().unlock();
             }
@@ -385,20 +335,7 @@ public class RegionBroker extends EmptyB
         if (destination != null) {
             inactiveDestinationsPurgeLock.readLock().lock();
             try {
-                switch (destination.getDestinationType()) {
-                case ActiveMQDestination.QUEUE_TYPE:
-                    queueRegion.removeProducer(context, info);
-                    break;
-                case ActiveMQDestination.TOPIC_TYPE:
-                    topicRegion.removeProducer(context, info);
-                    break;
-                case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                    tempQueueRegion.removeProducer(context, info);
-                    break;
-                case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                    tempTopicRegion.removeProducer(context, info);
-                    break;
-                }
+                getRegion(destination).removeProducer(context, info);
             } finally {
                 inactiveDestinationsPurgeLock.readLock().unlock();
             }
@@ -413,22 +350,7 @@ public class RegionBroker extends EmptyB
         }
         inactiveDestinationsPurgeLock.readLock().lock();
         try {
-            switch (destination.getDestinationType()) {
-            case ActiveMQDestination.QUEUE_TYPE:
-                return queueRegion.addConsumer(context, info);
-
-            case ActiveMQDestination.TOPIC_TYPE:
-                return topicRegion.addConsumer(context, info);
-
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                return tempQueueRegion.addConsumer(context, info);
-
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                return tempTopicRegion.addConsumer(context, info);
-
-            default:
-                throw createUnknownDestinationTypeException(destination);
-            }
+            return getRegion(destination).addConsumer(context, info);
         } finally {
             inactiveDestinationsPurgeLock.readLock().unlock();
         }
@@ -439,23 +361,7 @@ public class RegionBroker extends EmptyB
         ActiveMQDestination destination = info.getDestination();
         inactiveDestinationsPurgeLock.readLock().lock();
         try {
-            switch (destination.getDestinationType()) {
-
-            case ActiveMQDestination.QUEUE_TYPE:
-                queueRegion.removeConsumer(context, info);
-                break;
-            case ActiveMQDestination.TOPIC_TYPE:
-                topicRegion.removeConsumer(context, info);
-                break;
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                tempQueueRegion.removeConsumer(context, info);
-                break;
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                tempTopicRegion.removeConsumer(context, info);
-                break;
-            default:
-                throw createUnknownDestinationTypeException(destination);
-            }
+            getRegion(destination).removeConsumer(context, info);
         } finally {
             inactiveDestinationsPurgeLock.readLock().unlock();
         }
@@ -479,24 +385,7 @@ public class RegionBroker extends EmptyB
                 || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed()))
{
             // ensure the destination is registered with the RegionBroker
             producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(),
destination, isAllowTempAutoCreationOnSend());
-            Region region;
-            switch (destination.getDestinationType()) {
-            case ActiveMQDestination.QUEUE_TYPE:
-                region = queueRegion;
-                break;
-            case ActiveMQDestination.TOPIC_TYPE:
-                region = topicRegion;
-                break;
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                region = tempQueueRegion;
-                break;
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                region = tempTopicRegion;
-                break;
-            default:
-                throw createUnknownDestinationTypeException(destination);
-            }
-            producerExchange.setRegion(region);
+            producerExchange.setRegion(getRegion(destination));
             producerExchange.setRegionDestination(null);
         }
 
@@ -504,7 +393,7 @@ public class RegionBroker extends EmptyB
 
         // clean up so these references aren't kept (possible leak) in the producer exchange
         // especially since temps are transitory
-        if (destination.isTemporary()) {
+        if (producerExchange.isMutable()) {
             producerExchange.setRegionDestination(null);
             producerExchange.setRegion(null);
         }
@@ -514,49 +403,33 @@ public class RegionBroker extends EmptyB
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws
Exception {
         if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
             ActiveMQDestination destination = ack.getDestination();
-            Region region;
-            switch (destination.getDestinationType()) {
-            case ActiveMQDestination.QUEUE_TYPE:
-                region = queueRegion;
-                break;
-            case ActiveMQDestination.TOPIC_TYPE:
-                region = topicRegion;
-                break;
-            case ActiveMQDestination.TEMP_QUEUE_TYPE:
-                region = tempQueueRegion;
-                break;
-            case ActiveMQDestination.TEMP_TOPIC_TYPE:
-                region = tempTopicRegion;
-                break;
-            default:
-                throw createUnknownDestinationTypeException(destination);
-            }
-            consumerExchange.setRegion(region);
+            consumerExchange.setRegion(getRegion(destination));
         }
         consumerExchange.getRegion().acknowledge(consumerExchange, ack);
     }
 
-    @Override
-    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception
{
-        ActiveMQDestination destination = pull.getDestination();
+    protected Region getRegion(ActiveMQDestination destination) throws JMSException {
         switch (destination.getDestinationType()) {
         case ActiveMQDestination.QUEUE_TYPE:
-            return queueRegion.messagePull(context, pull);
-
+            return queueRegion;
         case ActiveMQDestination.TOPIC_TYPE:
-            return topicRegion.messagePull(context, pull);
-
+            return topicRegion;
         case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            return tempQueueRegion.messagePull(context, pull);
-
+            return tempQueueRegion;
         case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            return tempTopicRegion.messagePull(context, pull);
+            return tempTopicRegion;
         default:
             throw createUnknownDestinationTypeException(destination);
         }
     }
 
     @Override
+    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception
{
+        ActiveMQDestination destination = pull.getDestination();
+        return getRegion(destination).messagePull(context, pull);
+    }
+
+    @Override
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception
{
         throw new IllegalAccessException("Transaction operation not implemented by this broker.");
     }
@@ -684,22 +557,7 @@ public class RegionBroker extends EmptyB
     @Override
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception {
         ActiveMQDestination destination = messageDispatchNotification.getDestination();
-        switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            queueRegion.processDispatchNotification(messageDispatchNotification);
-            break;
-        case ActiveMQDestination.TOPIC_TYPE:
-            topicRegion.processDispatchNotification(messageDispatchNotification);
-            break;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            tempQueueRegion.processDispatchNotification(messageDispatchNotification);
-            break;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            tempTopicRegion.processDispatchNotification(messageDispatchNotification);
-            break;
-        default:
-            throw createUnknownDestinationTypeException(destination);
-        }
+        getRegion(destination).processDispatchNotification(messageDispatchNotification);
     }
 
     @Override
@@ -879,24 +737,9 @@ public class RegionBroker extends EmptyB
     @Override
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl
control) {
         ActiveMQDestination destination = control.getDestination();
-        switch (destination.getDestinationType()) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            queueRegion.processConsumerControl(consumerExchange, control);
-            break;
-
-        case ActiveMQDestination.TOPIC_TYPE:
-            topicRegion.processConsumerControl(consumerExchange, control);
-            break;
-
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            tempQueueRegion.processConsumerControl(consumerExchange, control);
-            break;
-
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            tempTopicRegion.processConsumerControl(consumerExchange, control);
-            break;
-
-        default:
+        try {
+            getRegion(destination).processConsumerControl(consumerExchange, control);
+        } catch (JMSException jmse) {
             LOG.warn("unmatched destination: " + destination + ", in consumerControl: " 
+ control);
         }
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java?rev=1423834&r1=1423833&r2=1423834&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java Wed
Dec 19 11:41:17 2012
@@ -20,12 +20,12 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.*;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.util.Wait;
 
-import javax.jms.*;
 import javax.jms.Connection;
+import javax.jms.*;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Map;
@@ -135,9 +135,14 @@ public class AMQ4222Test extends TestSup
         // still have a reference in his producerBrokerExchange.. this will keep the destination
         // from being reclaimed by GC if there is never another send that producer makes...
         // let's see if that reference is there...
-        TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost");
+        final TransportConnector connector = VMTransportFactory.CONNECTORS.get("localhost");
         assertNotNull(connector);
-        assertEquals(1, connector.getConnections().size());
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return connector.getConnections().size() == 1;
+            }
+        }));
         TransportConnection transportConnection = connector.getConnections().get(0);
         Map<ProducerId, ProducerBrokerExchange> exchanges = getProducerExchangeFromConn(transportConnection);
         assertEquals(1, exchanges.size());



Mime
View raw message