activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-922 implement purge semantics
Date Wed, 01 Feb 2017 14:50:07 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 35415510d -> 57038ff47


ARTEMIS-922 implement purge semantics


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/17528141
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/17528141
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/17528141

Branch: refs/heads/master
Commit: 1752814197549e7e91aeb8fe72488736ec1fb91a
Parents: 3541551
Author: Justin Bertram <jbertram@apache.org>
Authored: Thu Jan 19 10:36:26 2017 -0600
Committer: Justin Bertram <jbertram@apache.org>
Committed: Tue Jan 31 08:37:05 2017 -0600

----------------------------------------------------------------------
 .../core/server/ActiveMQServerLogger.java       |  4 +
 .../core/server/AutoCreatedQueueManager.java    | 25 ------
 .../artemis/core/server/QueueManager.java       | 25 ++++++
 .../core/server/impl/ActiveMQServerImpl.java    |  4 +-
 .../impl/AutoCreatedQueueManagerImpl.java       | 77 ------------------
 .../server/impl/PostOfficeJournalLoader.java    |  2 +-
 .../artemis/core/server/impl/QueueImpl.java     |  8 +-
 .../core/server/impl/QueueManagerImpl.java      | 86 ++++++++++++++++++++
 .../integration/addressing/AddressingTest.java  | 35 ++++----
 .../amqp/ClientDefinedMultiConsumerTest.java    | 22 ++++-
 10 files changed, 152 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index b52ed24..c365b7d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1558,4 +1558,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format =
Message.Format.MESSAGE_FORMAT)
    void invalidMessageCounterPeriod(long value);
 
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
+   void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java
deleted file mode 100644
index 5af5c0e..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/AutoCreatedQueueManager.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.utils.ReferenceCounter;
-
-public interface AutoCreatedQueueManager extends ReferenceCounter {
-
-   SimpleString getQueueName();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java
new file mode 100644
index 0000000..a847757
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueManager.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.utils.ReferenceCounter;
+
+public interface QueueManager extends ReferenceCounter {
+
+   SimpleString getQueueName();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 5c392cc..7575100 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2472,8 +2472,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       if (transientQueue) {
          queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
-      } else if (queue.isAutoCreated()) {
-         queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName()));
+      } else {
+         queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
       }
 
       final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue,
nodeManager.getNodeId());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
deleted file mode 100644
index fd89a94..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server.impl;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
-
-public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
-
-   private final SimpleString queueName;
-
-   private final ActiveMQServer server;
-
-   private final Runnable runnable = new Runnable() {
-      @Override
-      public void run() {
-         Queue queue = server.locateQueue(queueName);
-         SimpleString address = queue.getAddress();
-         AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
-         long consumerCount = queue.getConsumerCount();
-         long messageCount = queue.getMessageCount();
-
-         if (((queue.isAutoCreated() && settings.isAutoDeleteQueues()) || queue.isPurgeOnNoConsumers())
&& queue.getMessageCount() == 0) {
-            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created
" : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount
= " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
-            }
-
-            try {
-               server.destroyQueue(queueName, null, true, false);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
-            }
-         }
-      }
-   };
-
-   private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
-
-   public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
-      this.server = server;
-      this.queueName = queueName;
-   }
-
-   @Override
-   public int increment() {
-      return referenceCounterUtil.increment();
-   }
-
-   @Override
-   public int decrement() {
-      return referenceCounterUtil.decrement();
-   }
-
-   @Override
-   public SimpleString getQueueName() {
-      return queueName;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 65de0c9..a8f2d85 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -153,7 +153,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
             .maxConsumers(queueBindingInfo.getMaxConsumers())
             .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
          final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
-         queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(),
queueBindingInfo.getQueueName()));
+         queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(),
queueBindingInfo.getQueueName()));
 
          if (queueBindingInfo.getQueueStatusEncodings() != null) {
             for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings())
{

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 6834bb4..8242760 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -851,13 +851,7 @@ public class QueueImpl implements Queue {
             refCountForConsumers.decrement();
          }
 
-         if (noConsumers.decrementAndGet() == 0 && purgeOnNoConsumers) {
-            try {
-               deleteQueue();
-            } catch (Exception e) {
-               logger.error("Error deleting queue on no consumers.  " + this.toString(),
e);
-            }
-         }
+         noConsumers.decrementAndGet();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
new file mode 100644
index 0000000..692eba7
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.QueueManager;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
+
+public class QueueManagerImpl implements QueueManager {
+
+   private final SimpleString queueName;
+
+   private final ActiveMQServer server;
+
+   private final Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+         Queue queue = server.locateQueue(queueName);
+         SimpleString address = queue.getAddress();
+         AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+         long consumerCount = queue.getConsumerCount();
+         long messageCount = queue.getMessageCount();
+
+         if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount()
== 0) {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+               ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created
" : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount
= " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
+            }
+
+            try {
+               server.destroyQueue(queueName, null, true, false);
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
+            }
+         } else if (queue.isPurgeOnNoConsumers()) {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+               ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount
= " + consumerCount + "; messageCount = " + messageCount);
+            }
+            try {
+               queue.deleteAllReferences();
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
+            }
+         }
+      }
+   };
+
+   private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
+
+   public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
+      this.server = server;
+      this.queueName = queueName;
+   }
+
+   @Override
+   public int increment() {
+      return referenceCounterUtil.increment();
+   }
+
+   @Override
+   public int decrement() {
+      return referenceCounterUtil.decrement();
+   }
+
+   @Override
+   public SimpleString getQueueName() {
+      return queueName;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 98957e2..0eb5f32 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -224,37 +224,30 @@ public class AddressingTest extends ActiveMQTestBase {
 
    @Test
    public void testPurgeOnNoConsumersTrue() throws Exception {
-
       SimpleString address = new SimpleString("test.address");
       SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
-      // For each address, create 2 Queues with the same address, assert both queues receive
message
-      boolean purgeOnNoConsumers = true;
-      Queue q1 = server.createQueue(address, RoutingType.MULTICAST, queueName, null, true,
false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true);
-
+      server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false,
false, false, false, 1, true, true);
+      assertNotNull(server.locateQueue(queueName));
       ClientSession session = sessionFactory.createSession();
-      session.start();
-
-      ClientConsumer consumer1 = session.createConsumer(q1.getName());
-      consumer1.close();
-
-      assertFalse(server.queueQuery(queueName).isExists());
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createMessage(true));
+      session.createConsumer(queueName).close();
+      assertNotNull(server.locateQueue(queueName));
+      assertEquals(0, server.locateQueue(queueName).getMessageCount());
    }
 
    @Test
    public void testPurgeOnNoConsumersFalse() throws Exception {
       SimpleString address = new SimpleString("test.address");
       SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
-      // For each address, create 2 Queues with the same address, assert both queues receive
message
-      boolean purgeOnNoConsumers = false;
-      Queue q1 = server.createQueue(address,RoutingType.MULTICAST, queueName, null, true,
false, Queue.MAX_CONSUMERS_UNLIMITED, purgeOnNoConsumers, true);
-
+      server.createQueue(address, RoutingType.ANYCAST, queueName, null, null, true, false,
false, false, false, 1, false, true);
+      assertNotNull(server.locateQueue(queueName));
       ClientSession session = sessionFactory.createSession();
-      session.start();
-
-      ClientConsumer consumer1 = session.createConsumer(q1.getName());
-      consumer1.close();
-
-      assertTrue(server.queueQuery(queueName).isExists());
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createMessage(true));
+      session.createConsumer(queueName).close();
+      assertNotNull(server.locateQueue(queueName));
+      assertEquals(1, server.locateQueue(queueName).getMessageCount());
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17528141/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
index 9406295..2823983 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -61,7 +62,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
       receiver2.close();
       //check its been deleted
-      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))
== null;
+         }
+      }, 1000);
       connection.close();
    }
 
@@ -117,7 +123,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
       //check its been deleted
       connection.close();
-      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))
== null;
+         }
+      }, 1000);
    }
 
    @Test(timeout = 60000)
@@ -144,7 +155,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport
 {
       assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
       receiver2.close();
       //check its been deleted
-      assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))
== null;
+         }
+      }, 1000);
       connection.close();
    }
 


Mime
View raw message