activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5631
Date Tue, 03 Mar 2015 20:56:17 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 2ec586f26 -> ad57cc6fc


https://issues.apache.org/jira/browse/AMQ-5631

Support for temporary topic delete

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ad57cc6f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ad57cc6f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ad57cc6f

Branch: refs/heads/master
Commit: ad57cc6fcb9c5b5e663e02abd17b56dea12fa581
Parents: 2ec586f
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Mar 3 15:55:28 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Mar 3 15:55:28 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 115 ++++++++++++++-----
 .../activemq/transport/amqp/JMSClientTest.java  |   3 +-
 2 files changed, 87 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ad57cc6f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 6800854..a2a5ee5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,6 +39,7 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.Command;
@@ -517,12 +519,21 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
     static abstract class AmqpDeliveryListener {
 
+        protected ActiveMQDestination destination;
+        protected List<Runnable> closeActions = new ArrayList<Runnable>();
+
         abstract public void onDelivery(Delivery delivery) throws Exception;
 
         public void onDetach() throws Exception {
         }
 
         public void onClose() throws Exception {
+
+            for (Runnable action : closeActions) {
+                action.run();
+            }
+
+            closeActions.clear();
         }
 
         public void drainCheck() {
@@ -531,6 +542,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         abstract void doCommit() throws Exception;
 
         abstract void doRollback() throws Exception;
+
+        public void addCloseAction(Runnable action) {
+            closeActions.add(action);
+        }
+
+        public ActiveMQDestination getDestination() {
+            return destination;
+        }
+
+        public void setDestination(ActiveMQDestination destination) {
+            this.destination = destination;
+        }
     }
 
     private void onConnectionOpen() throws AmqpProtocolException {
@@ -683,14 +706,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     class ProducerContext extends BaseProducerContext {
         private final ProducerId producerId;
         private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
-        private final ActiveMQDestination destination;
         private boolean closed;
-        private final boolean anonymous;
+        private boolean anonymous;
 
-        public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean
anonymous) {
+        public ProducerContext(ProducerId producerId) {
             this.producerId = producerId;
-            this.destination = destination;
-            this.anonymous = anonymous;
         }
 
         @Override
@@ -797,6 +817,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             if (!closed) {
                 sendToActiveMQ(new RemoveInfo(producerId), null);
             }
+
+            super.onClose();
         }
 
         public void close() {
@@ -914,6 +936,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         // Client is producing to this receiver object
         org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
         int flow = producerCredit;
+
         try {
             if (remoteTarget instanceof Coordinator) {
                 pumpProtonToSocket();
@@ -924,29 +947,35 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             } else {
                 Target target = (Target) remoteTarget;
                 ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
+                final ProducerContext producerContext = new ProducerContext(producerId);
                 ActiveMQDestination destination = null;
-                boolean anonymous = false;
                 String targetNodeName = target.getAddress();
 
                 if ((targetNodeName == null || targetNodeName.length() == 0) && !target.getDynamic())
{
-                    anonymous = true;
+                    producerContext.anonymous = true;
                 } else if (target.getDynamic()) {
                     destination = createTemporaryDestination(receiver, target.getCapabilities());
                     Target actualTarget = new Target();
                     actualTarget.setAddress(destination.getQualifiedName());
                     actualTarget.setDynamic(true);
                     receiver.setTarget(actualTarget);
+                    producerContext.addCloseAction(new Runnable() {
+
+                        @Override
+                        public void run() {
+                            deleteTemporaryDestination((ActiveMQTempDestination) producerContext.getDestination());
+                        }
+                    });
                 } else {
                     destination = createDestination(remoteTarget);
                 }
 
-                final ProducerContext producerContext = new ProducerContext(producerId, destination,
anonymous);
-
                 receiver.setContext(producerContext);
                 receiver.flow(flow);
 
                 ProducerInfo producerInfo = new ProducerInfo(producerId);
                 producerInfo.setDestination(destination);
+                producerContext.setDestination(destination);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
                     @Override
                     public void onResponse(IAmqpProtocolConverter converter, Response response)
throws IOException {
@@ -1005,7 +1034,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         private boolean closed;
         public ConsumerInfo info;
         private boolean endOfBrowse = false;
-        public ActiveMQDestination destination;
         public int credit;
         public int consumerPrefetch = 0;
         private long lastDeliveredSequenceId;
@@ -1068,28 +1096,32 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
         @Override
         public void onClose() throws Exception {
-            if (!closed) {
-                closed = true;
-                sender.setContext(null);
-                subscriptionsByConsumerId.remove(consumerId);
-
-                AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
-                if (session != null) {
-                    session.consumers.remove(info.getConsumerId());
-                }
+            try {
+                if (!closed) {
+                    closed = true;
+                    sender.setContext(null);
+                    subscriptionsByConsumerId.remove(consumerId);
+
+                    AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
+                    if (session != null) {
+                        session.consumers.remove(info.getConsumerId());
+                    }
 
-                RemoveInfo removeCommand = new RemoveInfo(consumerId);
-                removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-                sendToActiveMQ(removeCommand, null);
+                    RemoveInfo removeCommand = new RemoveInfo(consumerId);
+                    removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+                    sendToActiveMQ(removeCommand, null);
 
-                if (info.isDurable()) {
-                    RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-                    rsi.setConnectionId(connectionId);
-                    rsi.setSubscriptionName(sender.getName());
-                    rsi.setClientId(connectionInfo.getClientId());
+                    if (info.isDurable()) {
+                        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                        rsi.setConnectionId(connectionId);
+                        rsi.setSubscriptionName(sender.getName());
+                        rsi.setClientId(connectionInfo.getClientId());
 
-                    sendToActiveMQ(rsi, null);
+                        sendToActiveMQ(rsi, null);
+                    }
                 }
+            } finally {
+                super.onClose();
             }
         }
 
@@ -1415,6 +1447,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 source.setAddress(destination.getQualifiedName());
                 source.setDynamic(true);
                 sender.setSource(source);
+                consumerContext.addCloseAction(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        deleteTemporaryDestination((ActiveMQTempDestination) consumerContext.getDestination());
+                    }
+                });
             } else {
                 destination = createDestination(source);
             }
@@ -1425,7 +1464,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(destination);
-            consumerContext.destination = destination;
+            consumerContext.setDestination(destination);
             int senderCredit = sender.getRemoteCredit();
             if (prefetch != 0) {
                 // use the value configured on the transport connector
@@ -1551,6 +1590,24 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         return rc;
     }
 
+    private void deleteTemporaryDestination(ActiveMQTempDestination destination) {
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionId);
+        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+        info.setDestination(destination);
+
+        sendToActiveMQ(info, new ResponseHandler() {
+
+            @Override
+            public void onResponse(IAmqpProtocolConverter converter, Response response) throws
IOException {
+                if (response.isException()) {
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    LOG.debug("Error during temp destination removeal: {}", exception.getMessage());
+                }
+            }
+        });
+    }
+
     private boolean contains(Symbol[] symbols, Symbol key) {
         if (symbols == null) {
             return false;

http://git-wip-us.apache.org/repos/asf/activemq/blob/ad57cc6f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 87553e2..552d828 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -1067,7 +1067,6 @@ public class JMSClientTest extends JMSClientTestSupport {
         }
     }
 
-    @Ignore("Broker cannot currently tell if it should delete a temp destination")
     @Test(timeout=30000)
     public void testDeleteTemporaryQueue() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();
@@ -1112,7 +1111,7 @@ public class JMSClientTest extends JMSClientTestSupport {
         }
     }
 
-    @Ignore("Broker cannot currently tell if it should delete a temp destination")
+    @Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly")
     @Test(timeout=30000)
     public void testDeleteTemporaryTopic() throws Exception {
         ActiveMQAdmin.enableJMSFrameTracing();


Mime
View raw message