activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-731 properly handle AMQP recreate durable subscription queue
Date Tue, 13 Sep 2016 18:14:08 GMT
ARTEMIS-731 properly handle AMQP recreate durable subscription queue


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/afd6bb7a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/afd6bb7a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/afd6bb7a

Branch: refs/heads/master
Commit: afd6bb7aa6fb831e78cd0276ccd4b516ff47a883
Parents: 59a5ec8
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Sep 12 14:29:41 2016 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Sep 13 14:10:26 2016 -0400

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  | 63 +++++++---------
 .../org/proton/plug/AMQPSessionCallback.java    | 12 ++-
 .../server/ProtonServerSenderContext.java       | 78 +++++++++++++-------
 .../test/minimalserver/MinimalSessionSPI.java   |  6 +-
 4 files changed, 95 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/afd6bb7a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index e422a34..8986aba 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.artemis.core.protocol.proton.plug;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -58,6 +61,7 @@ import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.AMQPSessionContext;
 import org.proton.plug.SASLResult;
 import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
 import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.proton.plug.sasl.PlainSASLResult;
 
@@ -204,56 +208,39 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    }
 
    @Override
-   public boolean queueQuery(String queueName) throws Exception {
-      boolean queryResult = false;
+   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception
{
+      QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
 
-      QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
-
-      if (queueQuery.isExists()) {
-         queryResult = true;
-      }
-      else {
-         if (queueQuery.isAutoCreateJmsQueues()) {
+      if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateJmsQueues()
&& autoCreate) {
+         try {
             serverSession.createQueue(new SimpleString(queueName), new SimpleString(queueName),
null, false, true);
-            queryResult = true;
+            queueQueryResult = new QueueQueryResult(queueQueryResult.getName(), queueQueryResult.getAddress(),
queueQueryResult.isDurable(), queueQueryResult.isTemporary(), queueQueryResult.getFilterString(),
queueQueryResult.getConsumerCount(), queueQueryResult.getMessageCount(), queueQueryResult.isAutoCreateJmsQueues(),
true);
          }
-         else {
-            queryResult = false;
+         catch (ActiveMQQueueExistsException e) {
+            // The queue may have been created by another thread in the mean time.  Catch
and do nothing.
          }
       }
-
-      return queryResult;
+      return queueQueryResult;
    }
 
    @Override
    public boolean bindingQuery(String address) throws Exception {
-      boolean queryResult = false;
-
       BindingQueryResult queueQuery = serverSession.executeBindingQuery(SimpleString.toSimpleString(address));
-
-      if (queueQuery.isExists()) {
-         queryResult = true;
-      }
-      else {
-         if (queueQuery.isAutoCreateJmsQueues()) {
-            serverSession.createQueue(new SimpleString(address), new SimpleString(address),
null, false, true);
-            queryResult = true;
-         }
-         else {
-            queryResult = false;
-         }
-      }
-
-      return queryResult;
+      return queueQuery.isExists();
    }
 
    @Override
    public void closeSender(final Object brokerConsumer) throws Exception {
+
+      final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
+      final CountDownLatch latch = new CountDownLatch(1);
+
       Runnable runnable = new Runnable() {
          @Override
          public void run() {
             try {
-               ((ServerConsumer) brokerConsumer).close(false);
+               consumer.close(false);
+               latch.countDown();
             }
             catch (Exception e) {
             }
@@ -271,6 +258,13 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
       else {
          runnable.run();
       }
+
+      try {
+         latch.await(10, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException e) {
+         throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue:
" + consumer.getQueue());
+      }
    }
 
    @Override
@@ -459,8 +453,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    }
 
    @Override
-   public void deleteQueue(String address) throws Exception {
-      manager.getServer().destroyQueue(new SimpleString(address));
+   public void deleteQueue(String queueName) throws Exception {
+      manager.getServer().destroyQueue(new SimpleString(queueName));
    }
 
    private void resetContext() {
@@ -540,5 +534,4 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
          return false;
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/afd6bb7a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index 8406431..b6acd3f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -17,6 +17,7 @@
 package org.proton.plug;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -48,7 +49,16 @@ public interface AMQPSessionCallback {
 
    void deleteQueue(String address) throws Exception;
 
-   boolean queueQuery(String queueName) throws Exception;
+   /**
+    * Returns true if a queue is found with matching name, if autoCreate=true and autoCreateJMSQueues
is switched on then
+    * this method will auto create the queue, with name=queueName, address=queueName, filter=null.
+    *
+    * @param queueName
+    * @param autoCreate
+    * @return
+    * @throws Exception
+    */
+   QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception;
 
    boolean bindingQuery(String address) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/afd6bb7a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 739f8e8..2d91f37 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -17,7 +17,10 @@
 package org.proton.plug.context.server;
 
 import java.util.Map;
+import java.util.Objects;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -47,6 +50,7 @@ import org.proton.plug.context.AbstractProtonContextSender;
 import org.proton.plug.context.AbstractProtonSessionContext;
 import org.proton.plug.context.ProtonPlugSender;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
+import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
 import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
 import org.proton.plug.exceptions.ActiveMQAMQPNotFoundException;
 import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
@@ -116,8 +120,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
 
       String selector = null;
 
-      String noLocalFilter = null;
-
       /*
       * even tho the filter is a map it will only return a single filter unless a nolocal
is also provided
       * */
@@ -134,11 +136,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
                return;
             }
          }
-
-         if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
-            String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
-            noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'"
+ remoteContainerId + "'";
-         }
       }
 
       /*
@@ -147,12 +144,25 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
       * */
       boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
 
+      if (isPubSub) {
+         if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
+            String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
+            String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString()
+ "<>'" + remoteContainerId + "'";
+            if (selector != null) {
+               selector += " AND " + noLocalFilter;
+            }
+            else {
+               selector = noLocalFilter;
+            }
+         }
+      }
+
       if (source == null) {
          // Attempt to recover a previous subscription happens when a link reattach happens
on a subscription queue
          String clientId = connection.getRemoteContainer();
          String pubId = sender.getName();
          queue = clientId + ":" + pubId;
-         boolean exists = sessionSPI.queueQuery(queue);
+         boolean exists = sessionSPI.queueQuery(queue, false).isExists();
 
          /*
          * If it exists then we know it is a subscription so we set the capabilities on the
source so we can delete on a
@@ -188,8 +198,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
          else {
             //if not dynamic then we use the targets address as the address to forward the
messages to, however there has to
             //be a queue bound to it so we nee to check this.
-
-
             if (isPubSub) {
                // if we are a subscription and durable create a durable queue using the container
id and link name
                if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
@@ -197,23 +205,39 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
                   String clientId = connection.getRemoteContainer();
                   String pubId = sender.getName();
                   queue = clientId + ":" + pubId;
-                  boolean exists = sessionSPI.queueQuery(queue);
-                  if (!exists) {
-                     sessionSPI.createDurableQueue(source.getAddress(), queue, noLocalFilter);
+                  QueueQueryResult result = sessionSPI.queueQuery(queue, false);
+
+                  if (result.isExists()) {
+                     // If a client reattaches to a durable subscription with a different
no-local filter value, selector
+                     // or address then we must recreate the queue (JMS semantics).
+
+                     if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector))
||
+                        (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString())))
{
+                        if (result.getConsumerCount() == 0) {
+                           sessionSPI.deleteQueue(queue);
+                           sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                        }
+                        else {
+                           throw new ActiveMQAMQPIllegalStateException("Unable to recreate
subscription, consumers already exist");
+                        }
+                     }
                   }
+                  else {
+                     sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
+                  }
+                  source.setAddress(queue);
                }
                //otherwise we are a volatile subscription
                else {
                   queue = java.util.UUID.randomUUID().toString();
                   try {
-                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, noLocalFilter);
+                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
                   }
                   catch (Exception e) {
                      throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                   }
                   source.setAddress(queue);
                }
-
             }
             else {
                queue = source.getAddress();
@@ -223,7 +247,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
             }
 
             try {
-               if (!sessionSPI.queueQuery(queue)) {
+               if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
             }
@@ -237,7 +261,7 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
 
          boolean browseOnly = !isPubSub && source.getDistributionMode() != null &&
source.getDistributionMode().equals(COPY);
          try {
-            brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly);
+            brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector,
browseOnly);
          }
          catch (Exception e) {
             throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
@@ -250,7 +274,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
       return source != null && pubSubPrefix != null && source.getAddress()
!= null && source.getAddress().startsWith(pubSubPrefix);
    }
 
-
    /*
    * close the session
    * */
@@ -276,20 +299,23 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
          sessionSPI.closeSender(brokerConsumer);
          //if this is a link close rather than a connection close or detach, we need to delete
any durable resources for
          // say pub subs
-         if (remoteLinkClose ) {
-            Source source = (Source)sender.getSource();
+         if (remoteLinkClose) {
+            Source source = (Source) sender.getSource();
             if (source != null && source.getAddress() != null && hasCapabilities(TOPIC,
source)) {
-               String address = source.getAddress();
-               boolean exists = sessionSPI.queueQuery(address);
-               if (exists) {
-                  sessionSPI.deleteQueue(address);
+               String queueName = source.getAddress();
+               QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
+               if (result.isExists() && source.getDynamic()) {
+                  sessionSPI.deleteQueue(queueName);
                }
                else {
                   String clientId = connection.getRemoteContainer();
                   String pubId = sender.getName();
                   String queue = clientId + ":" + pubId;
-                  exists = sessionSPI.queueQuery(queue);
-                  if (exists) {
+                  result = sessionSPI.queueQuery(queue, false);
+                  if (result.isExists()) {
+                     if (result.getConsumerCount() > 0) {
+                        System.out.println("error");
+                     }
                      sessionSPI.deleteQueue(queue);
                   }
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/afd6bb7a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index 0701b17..f9a3533 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -100,8 +102,8 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
    }
 
    @Override
-   public boolean queueQuery(String queueName) {
-      return true;
+   public QueueQueryResult queueQuery(String queueName, boolean autoCreate) {
+      return new QueueQueryResult(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName),
false, false, null, 0, 0, false);
    }
 
    @Override


Mime
View raw message