activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [24/48] activemq-artemis git commit: Fix OpenWire queue auto-creation failure
Date Wed, 23 Nov 2016 17:43:25 GMT
Fix OpenWire queue auto-creation failure


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fccea4aa
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fccea4aa
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fccea4aa

Branch: refs/heads/ARTEMIS-780
Commit: fccea4aa45e5f32f9baca1126fa492ed73c4967d
Parents: 493e999
Author: Howard Gao <howard.gao@gmail.com>
Authored: Fri Nov 11 19:40:36 2016 +0800
Committer: jbertram <jbertram@apache.com>
Committed: Wed Nov 23 09:04:34 2016 -0600

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 19 ++++++++--
 .../core/protocol/openwire/amq/AMQConsumer.java |  2 +-
 .../core/protocol/openwire/amq/AMQSession.java  | 40 ++++++++++++++++----
 .../protocol/openwire/util/OpenWireUtil.java    | 17 ---------
 4 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 8dc0b34..cdc62fd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -73,6 +73,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -177,6 +178,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
    private boolean useKeepAlive;
    private long maxInactivityDuration;
 
+   private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
+
    // TODO-NOW: check on why there are two connections created for every createConnection
on the client.
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
@@ -707,7 +710,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
    public void addDestination(DestinationInfo info) throws Exception {
       ActiveMQDestination dest = info.getDestination();
       if (dest.isQueue()) {
-         SimpleString qName = OpenWireUtil.toCoreAddress(dest);
+         SimpleString qName = new SimpleString(dest.getPhysicalName());
          QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
          if (binding == null) {
             if (dest.isTemporary()) {
@@ -789,6 +792,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       checkInactivity();
    }
 
+   public void addKnownDestination(final SimpleString address) {
+      knownDestinations.add(address);
+   }
+
+   public boolean containsKnownDestination(final SimpleString address) {
+      return knownDestinations.contains(address);
+   }
+
    class SlowConsumerDetection implements SlowConsumerDetectionListener {
 
       @Override
@@ -845,7 +856,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
    public void removeDestination(ActiveMQDestination dest) throws Exception {
       if (dest.isQueue()) {
          try {
-            server.destroyQueue(OpenWireUtil.toCoreAddress(dest));
+            server.destroyQueue(new SimpleString(dest.getPhysicalName()));
          } catch (ActiveMQNonExistentQueueException neq) {
             //this is ok, ActiveMQ 5 allows this and will actually do it quite often
             ActiveMQServerLogger.LOGGER.debug("queue never existed");
@@ -853,7 +864,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
 
       } else {
-         Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest));
+         Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(dest.getPhysicalName()));
 
          for (Binding binding : bindings.getBindings()) {
             Queue b = (Queue) binding.getBindable();
@@ -883,7 +894,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
     */
    private void validateDestination(ActiveMQDestination destination) throws Exception {
       if (destination.isQueue()) {
-         SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+         SimpleString physicalName = new SimpleString(destination.getPhysicalName());
          BindingQueryResult result = server.bindingQuery(physicalName);
          if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 5603cb8..2f05f45 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -95,7 +95,7 @@ public class AMQConsumer {
          serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null,
info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
       } else {
-         SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
+         SimpleString queueName = new SimpleString(openwireDestination.getPhysicalName());
          try {
             session.getCoreServer().createQueue(queueName, queueName, null, true, false);
          } catch (ActiveMQQueueExistsException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 5cab686..35fd733 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,16 +23,16 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -145,11 +145,10 @@ public class AMQSession implements SessionCallback {
 
       for (ActiveMQDestination openWireDest : dests) {
          if (openWireDest.isQueue()) {
-            SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
-            try {
-               getCoreServer().createQueue(queueName, queueName, null, true, false);
-            } catch (ActiveMQQueueExistsException e) {
-               // ignore
+            SimpleString queueName = new SimpleString(openWireDest.getPhysicalName());
+
+            if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
+               throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
             }
          }
          AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
@@ -162,6 +161,27 @@ public class AMQSession implements SessionCallback {
       return consumersList;
    }
 
+   private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws
Exception {
+      boolean hasQueue = true;
+      if (!connection.containsKnownDestination(queueName)) {
+
+         BindingQueryResult bindingQuery = server.bindingQuery(queueName);
+         QueueQueryResult queueBinding = server.queueQuery(queueName);
+
+         boolean isAutoCreate = bindingQuery.isExists() ? bindingQuery.isAutoCreateJmsQueues()
: true;
+
+         if (!queueBinding.isExists()) {
+            if (isAutoCreate) {
+               server.createQueue(queueName, queueName, null, true, isTemporary);
+               connection.addKnownDestination(queueName);
+            } else {
+               hasQueue = false;
+            }
+         }
+      }
+      return hasQueue;
+   }
+
    public void start() {
 
       coreSession.start();
@@ -338,7 +358,7 @@ public class AMQSession implements SessionCallback {
       // We fillup addresses, pagingStores and we will throw failure if that's the case
       for (int i = 0; i < actualDestinations.length; i++) {
          ActiveMQDestination dest = actualDestinations[i];
-         addresses[i] = OpenWireUtil.toCoreAddress(dest);
+         addresses[i] = new SimpleString(dest.getPhysicalName());
          pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
          if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL
&& pagingStores[i].isFull()) {
             throw new ResourceAllocationException("Queue is full");
@@ -357,6 +377,10 @@ public class AMQSession implements SessionCallback {
             connection.getTransportConnection().setAutoRead(false);
          }
 
+         if (actualDestinations[i].isQueue()) {
+            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()),
actualDestinations[i].isTemporary());
+         }
+
          RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
 
          if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue())
{

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fccea4aa/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index a6e7292..04bd6a3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -37,22 +36,6 @@ public class OpenWireUtil {
       return buffer;
    }
 
-   public static SimpleString toCoreAddress(ActiveMQDestination dest) {
-      if (dest.isQueue()) {
-         if (dest.isTemporary()) {
-            return new SimpleString(dest.getPhysicalName());
-         } else {
-            return new SimpleString(dest.getPhysicalName());
-         }
-      } else {
-         if (dest.isTemporary()) {
-            return new SimpleString(dest.getPhysicalName());
-         } else {
-            return new SimpleString(dest.getPhysicalName());
-         }
-      }
-   }
-
    /**
     * We convert the core address to an ActiveMQ Destination. We use the actual address on
the message rather than the
     * destination set on the consumer because it maybe different and the JMS spec says that
it should be what ever was


Mime
View raw message