activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-1269 Fixing blocked replication
Date Fri, 07 Jul 2017 03:39:57 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master f3cc555ab -> 15c2f09bc


ARTEMIS-1269 Fixing blocked replication

If replication blocked anything on the journal
the processing from clients would be blocked
and nothing would work.

As part of this fix I am using an executor on ServerSessionPacketHandler
which will also scale better as the reader from Netty would be feed immediately.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/89e84e13
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/89e84e13
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/89e84e13

Branch: refs/heads/master
Commit: 89e84e1320145c5b81996afa8251fa967f8881ff
Parents: f3cc555
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Jul 5 13:28:01 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Jul 6 19:04:48 2017 -0400

----------------------------------------------------------------------
 .../artemis/utils/OrderedExecutorFactory.java   | 17 +++++
 .../core/journal/impl/FileWrapperJournal.java   | 74 ++++++++++++--------
 .../core/ServerSessionPacketHandler.java        | 42 ++++++++++-
 .../core/impl/ActiveMQPacketHandler.java        |  2 +-
 .../core/replication/ReplicationEndpoint.java   | 45 ++++--------
 .../core/server/cluster/ClusterController.java  |  3 +
 .../core/server/impl/ActiveMQServerImpl.java    |  4 +-
 .../reattach/MultiThreadRandomReattachTest.java |  3 +
 .../NettyMultiThreadRandomReattachTest.java     |  3 +
 9 files changed, 127 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
index 24fa5e7..65cb08f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -18,7 +18,9 @@ package org.apache.activemq.artemis.utils;
 
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -33,6 +35,21 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
 
    private final Executor parent;
 
+
+   public static boolean flushExecutor(Executor executor) {
+      return flushExecutor(executor, 30, TimeUnit.SECONDS);
+   }
+
+   public static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit) {
+      final CountDownLatch latch = new CountDownLatch(1);
+      executor.execute(latch::countDown);
+      try {
+         return latch.await(timeout, unit);
+      } catch (Exception e) {
+         return false;
+      }
+   }
+
    /**
     * Construct a new instance delegating to the given parent executor.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 5ef240a..9dafd4b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -96,7 +96,7 @@ public final class FileWrapperJournal extends JournalBase {
                                IOCompletion callback) throws Exception {
       JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister,
record);
 
-      writeRecord(addRecord, sync, callback);
+      writeRecord(addRecord, false, -1, false, callback);
    }
 
    @Override
@@ -107,7 +107,9 @@ public final class FileWrapperJournal extends JournalBase {
     * Write the record to the current file.
     */
    private void writeRecord(JournalInternalRecord encoder,
-                            final boolean sync,
+                            final boolean tx,
+                            final long txID,
+                            final boolean removeTX,
                             final IOCompletion callback) throws Exception {
 
       lockAppend.lock();
@@ -115,30 +117,54 @@ public final class FileWrapperJournal extends JournalBase {
          if (callback != null) {
             callback.storeLineUp();
          }
-         currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
+         testSwitchFiles(encoder);
+         if (txID >= 0) {
+            if (tx) {
+               AtomicInteger value;
+               if (removeTX) {
+                  value = transactions.remove(txID);
+               } else {
+                  value = transactions.get(txID);
+               }
+               if (value != null) {
+                  encoder.setNumberOfRecords(value.get());
+               }
+            } else {
+               count(txID);
+            }
+         }
          encoder.setFileID(currentFile.getRecordID());
 
          if (callback != null) {
-            currentFile.getFile().write(encoder, sync, callback);
+            currentFile.getFile().write(encoder, false, callback);
          } else {
-            currentFile.getFile().write(encoder, sync);
+            currentFile.getFile().write(encoder, false);
          }
       } finally {
          lockAppend.unlock();
       }
    }
 
+   private void testSwitchFiles(JournalInternalRecord encoder) throws Exception {
+      JournalFile oldFile = currentFile;
+      currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
+      if (oldFile != currentFile) {
+         for (AtomicInteger value : transactions.values()) {
+            value.set(0);
+         }
+      }
+   }
+
    @Override
    public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception
{
       JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
-      writeRecord(deleteRecord, sync, callback);
+      writeRecord(deleteRecord, false, -1, false, callback);
    }
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record)
throws Exception {
-      count(txID);
       JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
-      writeRecord(deleteRecordTX, false, null);
+      writeRecord(deleteRecordTX, false, txID, false, null);
    }
 
    @Override
@@ -147,9 +173,8 @@ public final class FileWrapperJournal extends JournalBase {
                                             byte recordType,
                                             Persister persister,
                                             Object record) throws Exception {
-      count(txID);
       JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType,
persister, record);
-      writeRecord(addRecord, false, null);
+      writeRecord(addRecord, false, txID, false, null);
    }
 
    @Override
@@ -160,7 +185,7 @@ public final class FileWrapperJournal extends JournalBase {
                                   boolean sync,
                                   IOCompletion callback) throws Exception {
       JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister,
record);
-      writeRecord(updateRecord, sync, callback);
+      writeRecord(updateRecord, false, -1, false, callback);
    }
 
    @Override
@@ -169,9 +194,8 @@ public final class FileWrapperJournal extends JournalBase {
                                                byte recordType,
                                                Persister persister,
                                                Object record) throws Exception {
-      count(txID);
       JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType,
persister, record);
-      writeRecord(updateRecordTX, false, null);
+      writeRecord(updateRecordTX, false, txID, false, null);
    }
 
    @Override
@@ -180,12 +204,8 @@ public final class FileWrapperJournal extends JournalBase {
                                   IOCompletion callback,
                                   boolean lineUpContext) throws Exception {
       JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT,
txID, null);
-      AtomicInteger value = transactions.remove(txID);
-      if (value != null) {
-         commitRecord.setNumberOfRecords(value.get());
-      }
 
-      writeRecord(commitRecord, true, callback);
+      writeRecord(commitRecord, true, txID, true, callback);
    }
 
    @Override
@@ -194,20 +214,18 @@ public final class FileWrapperJournal extends JournalBase {
                                    boolean sync,
                                    IOCompletion callback) throws Exception {
       JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE,
txID, transactionData);
-      AtomicInteger value = transactions.get(txID);
-      if (value != null) {
-         prepareRecord.setNumberOfRecords(value.get());
-      }
-      writeRecord(prepareRecord, sync, callback);
+      writeRecord(prepareRecord, true, txID, false, callback);
    }
 
    private int count(long txID) throws ActiveMQException {
       AtomicInteger defaultValue = new AtomicInteger(1);
       AtomicInteger count = transactions.putIfAbsent(txID, defaultValue);
       if (count != null) {
-         return count.incrementAndGet();
+         count.incrementAndGet();
+      } else {
+         count = defaultValue;
       }
-      return defaultValue.get();
+      return count.intValue();
    }
 
    @Override
@@ -218,11 +236,7 @@ public final class FileWrapperJournal extends JournalBase {
    @Override
    public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws
Exception {
       JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
-      AtomicInteger value = transactions.remove(txID);
-      if (value != null) {
-         rollbackRecord.setNumberOfRecords(value.get());
-      }
-      writeRecord(rollbackRecord, sync, callback);
+      writeRecord(rollbackRecord, true, txID, true, callback);
    }
 
    // UNSUPPORTED STUFF

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 8e3c3ed..54d1d44 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -90,6 +91,9 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.apache.activemq.artemis.utils.SimpleFuture;
+import org.apache.activemq.artemis.utils.SimpleFutureImpl;
 import org.jboss.logging.Logger;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
@@ -141,6 +145,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    private volatile CoreRemotingConnection remotingConnection;
 
+   private final Executor callExecutor;
+
    private final CoreProtocolManager manager;
 
    // The current currentLargeMessage being processed
@@ -148,7 +154,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    private final boolean direct;
 
-   public ServerSessionPacketHandler(final CoreProtocolManager manager,
+   public ServerSessionPacketHandler(final Executor callExecutor,
+                                     final CoreProtocolManager manager,
                                      final ServerSession session,
                                      final StorageManager storageManager,
                                      final Channel channel) {
@@ -166,6 +173,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
       Connection conn = remotingConnection.getTransportConnection();
 
+      this.callExecutor = callExecutor;
+
       if (conn instanceof NettyConnection) {
          direct = ((NettyConnection) conn).isDirectDeliver();
       } else {
@@ -199,11 +208,18 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.errorClosingSession(e);
       }
+      flushExecutor();
 
       ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
    }
 
+   private void flushExecutor() {
+      OrderedExecutorFactory.flushExecutor(callExecutor);
+   }
+
    public void close() {
+      flushExecutor();
+
       channel.flushConfirmations();
 
       try {
@@ -219,6 +235,11 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    @Override
    public void handlePacket(final Packet packet) {
+      channel.confirm(packet);
+      callExecutor.execute(() -> internalHandlePacket(packet));
+   }
+
+   private void internalHandlePacket(final Packet packet) {
       byte type = packet.getType();
 
       storageManager.setContext(session.getSessionContext());
@@ -653,8 +674,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                                      final boolean flush,
                                      final boolean closeChannel) {
       if (confirmPacket != null) {
-         channel.confirm(confirmPacket);
-
          if (flush) {
             channel.flushConfirmations();
          }
@@ -678,9 +697,26 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             remotingConnection.removeFailureListener((FailureListener) closeListener);
          }
       }
+
+      flushExecutor();
    }
 
    public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
{
+
+      SimpleFuture<Integer> future = new SimpleFutureImpl<>();
+      callExecutor.execute(() -> {
+         int value = internaltransferConnection(newConnection, lastReceivedCommandID);
+         future.set(value);
+      });
+
+      try {
+         return future.get().intValue();
+      } catch (Exception e) {
+         throw new IllegalStateException(e);
+      }
+   }
+
+   private int internaltransferConnection(final CoreRemotingConnection newConnection, final
int lastReceivedCommandID) {
       // We need to disable delivery on all the consumers while the transfer is occurring-
otherwise packets might get
       // delivered
       // after the channel has transferred but *before* packets have been replayed - this
will give the client the wrong

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 3cf2cea..765d6fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -169,7 +169,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
 
          ServerSession session = server.createSession(request.getName(), activeMQPrincipal
== null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null
? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(),
connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(),
request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager,
channel, connection), true, sessionOperationContext, routingTypeMap);
 
-         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager,
session, server.getStorageManager(), channel);
+         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server.getExecutorFactory().getExecutor(),
protocolManager, session, server.getStorageManager(), channel);
          channel.setHandler(handler);
 
          // TODO - where is this removed?

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index a68c3f9..6683fbe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -27,9 +27,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -81,6 +79,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.jboss.logging.Logger;
 
 /**
@@ -204,9 +203,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
             ActiveMQServerLogger.LOGGER.invalidPacketForReplication(packet);
          }
       } catch (ActiveMQException e) {
+         logger.warn(e.getMessage(), e);
          ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
          response = new ActiveMQExceptionMessage(e);
       } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
          ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
          response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
       }
@@ -278,6 +279,12 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
          return;
       }
 
+      logger.trace("Stopping endpoint");
+
+      started = false;
+
+      OrderedExecutorFactory.flushExecutor(executor);
+
       // Channel may be null if there isn't a connection to a live server
       if (channel != null) {
          channel.close();
@@ -315,15 +322,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       pageManager.stop();
 
       pageIndex.clear();
-      final CountDownLatch latch = new CountDownLatch(1);
-      executor.execute(new Runnable() {
-
-         @Override
-         public void run() {
-            latch.countDown();
-         }
-      });
-      latch.await(30, TimeUnit.SECONDS);
 
       // Storage needs to be the last to stop
       storageManager.stop();
@@ -471,28 +469,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       if (logger.isTraceEnabled()) {
          logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
       }
+      ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
+      if (!started)
+         return replicationResponseMessage;
 
       if (packet.isSynchronizationFinished()) {
-         executor.execute(() -> {
-            try {
-               // this is a long running process, we cannot block the reading thread from
netty
-               finishSynchronization(packet.getNodeID());
-               if (logger.isTraceEnabled()) {
-                  logger.trace("returning completion on synchronization catchup");
-               }
-               channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
-            } catch (Exception e) {
-               logger.warn(e.getMessage());
-               channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
-            }
-
-         });
-         // the write will happen through an executor
-         return null;
-      }
-
-      ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
-      if (!started) {
+         finishSynchronization(packet.getNodeID());
+         replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
          return replicationResponseMessage;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index e88b14d..b0d0232 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -326,6 +326,9 @@ public class ClusterController implements ActiveMQComponent {
       @Override
       public void handlePacket(Packet packet) {
          if (!isStarted()) {
+            if (channelHandler != null) {
+               channelHandler.handlePacket(packet);
+            }
             return;
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index ba7bed0..be42ea6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1750,7 +1750,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       queue.deleteQueue(removeConsumers);
 
-      if (autoDeleteAddress && postOffice != null && getAddressInfo(address).isAutoCreated())
{
+      AddressInfo addressInfo = getAddressInfo(address);
+
+      if (autoDeleteAddress && postOffice != null && addressInfo != null
&& addressInfo.isAutoCreated()) {
          try {
             removeAddressInfo(address, session);
          } catch (ActiveMQDeleteAddressException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
index 31fe17e..20ae956 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/MultiThreadRandomReattachTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.reattach;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 
 /**
  * A MultiThreadRandomReattachTest
@@ -28,6 +30,7 @@ public class MultiThreadRandomReattachTest extends MultiThreadRandomReattachTest
    protected void start() throws Exception {
       Configuration liveConf = createDefaultInVMConfig();
       server = createServer(false, liveConf);
+      server.getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(ADDRESS.toString()).addRoutingType(RoutingType.MULTICAST));
       server.start();
       waitForServerToStart(server);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e84e13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
index 984b69e..fc5d903 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/NettyMultiThreadRandomReattachTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.reattach;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 
 public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattachTest {
 
@@ -25,6 +27,7 @@ public class NettyMultiThreadRandomReattachTest extends MultiThreadRandomReattac
    protected void start() throws Exception {
       Configuration liveConf = createDefaultNettyConfig();
       server = createServer(false, liveConf);
+      server.getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(ADDRESS.toString()).addRoutingType(RoutingType.MULTICAST));
       server.start();
       waitForServerToStart(server);
    }


Mime
View raw message