activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject activemq-artemis git commit: ARTEMIS-2046 Fixing issues with JournalStorageManager.stop in replication, JDBC and shared storage
Date Tue, 21 Aug 2018 13:30:27 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 9427d0010 -> e617f322d


ARTEMIS-2046 Fixing issues with JournalStorageManager.stop in replication, JDBC and shared
storage

(cherry picked from commit 63e6cd98f856ba8900782b7488c3ce4cf9e48257)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e617f322
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e617f322
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e617f322

Branch: refs/heads/2.6.x
Commit: e617f322dad1d6e6b044af03d6b4977afaaf08c4
Parents: 9427d00
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Fri Aug 17 16:45:07 2018 -0400
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Tue Aug 21 14:29:33 2018 +0100

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQExceptionType.java |   6 +
 .../api/core/ActiveMQShutdownException.java     |  31 ++++
 .../core/client/impl/ClientSessionImpl.java     |   4 +-
 .../jdbc/store/journal/JDBCJournalImpl.java     |  12 +-
 .../artemis/core/journal/impl/JournalImpl.java  |  41 +++--
 .../journal/AbstractJournalStorageManager.java  |  16 +-
 .../impl/journal/JournalStorageManager.java     |  63 +++++---
 .../core/replication/ReplicationEndpoint.java   |  14 ++
 .../core/replication/ReplicationManager.java    |   9 +-
 .../failover/NettyReplicationStopTest.java      | 150 +++++++++++++++++++
 .../tests/integration/xa/BasicXaTest.java       | 100 +++++++++++++
 11 files changed, 407 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 9120d79..7cec2e4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -249,6 +249,12 @@ public enum ActiveMQExceptionType {
       public ActiveMQException createException(String msg) {
          return new ActiveMQNullRefException(msg);
       }
+   },
+   SHUTDOWN_ERROR(219) {
+      @Override
+      public ActiveMQException createException(String msg) {
+         return new ActiveMQShutdownException(msg);
+      }
    };
 
    private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java
new file mode 100644
index 0000000..03797a8
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQShutdownException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * An operation failed because an address exists on the server.
+ */
+public final class ActiveMQShutdownException extends ActiveMQException {
+
+   public ActiveMQShutdownException() {
+      super(ActiveMQExceptionType.SHUTDOWN_ERROR);
+   }
+
+   public ActiveMQShutdownException(String msg) {
+      super(ActiveMQExceptionType.SHUTDOWN_ERROR, msg);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index ab9888e..711d7ce 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1500,9 +1500,11 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
 
          XAException xaException = null;
          if (onePhase) {
+            logger.debug("Throwing oneFase RMFAIL on xid=" + xid, t);
             //we must return XA_RMFAIL
             xaException = new XAException(XAException.XAER_RMFAIL);
          } else {
+            logger.debug("Throwing twoFase Retry on xid=" + xid, t);
             // Any error on commit -> RETRY
             // We can't rollback a Prepared TX for definition
             xaException = new XAException(XAException.XA_RETRY);
@@ -1753,7 +1755,7 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
       } catch (XAException xae) {
          throw xae;
       } catch (ActiveMQException e) {
-         if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT)
{
+         if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT
|| e.getType() == ActiveMQExceptionType.SHUTDOWN_ERROR) {
             // Unblocked on failover
             throw new XAException(XAException.XA_RETRY);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index e7b45ff..e450620 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -31,7 +31,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.EncoderPersister;
@@ -335,19 +337,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
    }
 
 
-   private void checkStatus() {
+   private void checkStatus() throws Exception {
       checkStatus(null);
    }
 
-   private void checkStatus(IOCompletion callback) {
+   private void checkStatus(IOCompletion callback) throws Exception {
       if (!started) {
          if (callback != null) callback.onError(-1, "JDBC Journal is not loaded");
-         throw new IllegalStateException("JDBCJournal is not loaded");
+         throw new ActiveMQShutdownException("JDBCJournal is not loaded");
       }
 
       if (failed.get()) {
          if (callback != null) callback.onError(-1, "JDBC Journal failed");
-         throw new IllegalStateException("JDBCJournal Failed");
+         throw new ActiveMQException("JDBCJournal Failed");
       }
    }
 
@@ -389,7 +391,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
       if (callback != null) callback.waitCompletion();
    }
 
-   private synchronized void addTxRecord(JDBCJournalRecord record) {
+   private synchronized void addTxRecord(JDBCJournalRecord record) throws Exception {
 
       if (logger.isTraceEnabled()) {
          logger.trace("addTxRecord " + record + ", started=" + started + ", failed=" + failed);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 55b92c5..30ed6e3 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -823,6 +824,9 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                                              usedFile);
                }
                result.set(true);
+            } catch (ActiveMQShutdownException e) {
+               result.fail(e);
+               logger.error("appendPrepareRecord:" + e, e);
             } catch (Throwable e) {
                result.fail(e);
                setErrorCondition(callback, null, e);
@@ -882,7 +886,10 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                }
 
                result.set(true);
-            } catch (Exception e) {
+            } catch (ActiveMQShutdownException e) {
+               result.fail(e);
+               logger.error("appendUpdateRecord:" + e, e);
+            } catch (Throwable e) {
                result.fail(e);
                setErrorCondition(callback, null, e);
                logger.error("appendUpdateRecord:" + e, e);
@@ -933,7 +940,10 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                   record.delete(usedFile);
                }
                result.set(true);
-            } catch (Exception e) {
+            } catch (ActiveMQShutdownException e) {
+               result.fail(e);
+               logger.error("appendDeleteRecord:" + e, e);
+            } catch (Throwable e) {
                result.fail(e);
                logger.error("appendDeleteRecord:" + e, e);
             } finally {
@@ -993,7 +1003,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                }
 
                tx.addPositive(usedFile, id, addRecord.getEncodeSize());
-            } catch (Exception e) {
+            } catch (Throwable e) {
                logger.error("appendAddRecordTransactional:" + e, e);
                setErrorCondition(null, tx, e);
             } finally {
@@ -1031,9 +1041,9 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
       }
    }
 
-   private void checkJournalIsLoaded() {
+   private void checkJournalIsLoaded() throws Exception {
       if (state != JournalState.LOADED && state != JournalState.SYNCING) {
-         throw new IllegalStateException("Journal must be in state=" + JournalState.LOADED
+ ", was [" + state + "]");
+         throw new ActiveMQShutdownException("Journal must be in state=" + JournalState.LOADED
+ ", was [" + state + "]");
       }
    }
 
@@ -1085,7 +1095,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                }
 
                tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
-            } catch ( Exception e ) {
+            } catch (Throwable e ) {
                logger.error("appendUpdateRecordTransactional:" +  e.getMessage(), e );
                setErrorCondition(null, tx, e );
             } finally {
@@ -1132,7 +1142,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                }
 
                tx.addNegative(usedFile, id);
-            } catch (Exception e) {
+            } catch (Throwable e) {
                logger.error("appendDeleteRecordTransactional:" + e, e);
                setErrorCondition(null, tx, e);
             } finally {
@@ -1185,7 +1195,10 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                }
 
                tx.prepare(usedFile);
-            } catch (Exception e) {
+            } catch (ActiveMQShutdownException e) {
+               result.fail(e);
+               logger.error("appendPrepareRecord:" + e, e);
+            } catch (Throwable e) {
                result.fail(e);
                logger.error("appendPrepareRecord:" + e, e);
                setErrorCondition(callback, tx, e);
@@ -1267,6 +1280,9 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
 
 
                tx.commit(usedFile);
+            } catch (ActiveMQShutdownException e) {
+               result.fail(e);
+               logger.error("appendCommitRecord:" + e, e);
             } catch (Throwable e) {
                result.fail(e);
                logger.error("appendCommitRecord:" + e, e);
@@ -1317,6 +1333,9 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
 
                tx.rollback(usedFile);
+            } catch (ActiveMQShutdownException e) {
+               result.fail(e);
+               logger.error("appendRollbackRecord:" + e, e);
             } catch (Throwable e) {
                result.fail(e);
                logger.error("appendRollbackRecord:" + e, e);
@@ -2360,10 +2379,10 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
          return;
       }
 
-      setJournalState(JournalState.STOPPED);
-
       flush();
 
+      setJournalState(JournalState.STOPPED);
+
       if (providedIOThreadPool == null) {
          threadPool.shutdown();
 
@@ -2681,6 +2700,8 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                                     final JournalTransaction tx,
                                     final IOCallback parameterCallback) throws Exception
{
 
+      checkJournalIsLoaded();
+
       final IOCallback callback;
 
       final int size = encoder.getEncodeSize();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index dc2e20b..8e11f57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -126,8 +126,10 @@ import org.jboss.logging.Logger;
  */
 public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements
StorageManager {
 
-   private static final int CRITICAL_PATHS = 1;
-   private static final int CRITICAL_STORE = 0;
+   protected static final int CRITICAL_PATHS = 3;
+   protected static final int CRITICAL_STORE = 0;
+   protected static final int CRITICAL_STOP = 1;
+   protected static final int CRITICAL_STOP_2 = 2;
 
    private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
 
@@ -405,6 +407,16 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       leaveCritical(CRITICAL_STORE);
    }
 
+   /** for internal use and testsuite, don't use it outside of tests */
+   public void writeLock() {
+      storageManagerLock.writeLock().lock();
+   }
+
+   /** for internal use and testsuite, don't use it outside of tests */
+   public void writeUnlock() {
+      storageManagerLock.writeLock().unlock();
+   }
+
    @Override
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
       readLock();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index dd8bb22..867f2d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -229,9 +229,21 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
    }
 
    @Override
-   public synchronized void stop(boolean ioCriticalError, boolean sendFailover) throws Exception
{
+   public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
+      try {
+         enterCritical(CRITICAL_STOP);
+         synchronized (this) {
+            if (internalStop(ioCriticalError, sendFailover))
+               return;
+         }
+      } finally {
+         leaveCritical(CRITICAL_STOP);
+      }
+   }
+
+   private boolean internalStop(boolean ioCriticalError, boolean sendFailover) throws Exception
{
       if (!started) {
-         return;
+         return true;
       }
 
       if (!ioCriticalError) {
@@ -255,30 +267,41 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
          // that's ok
       }
 
-      // We cache the variable as the replicator could be changed between here and the time
we call stop
-      // since sendLiveIsStopping may issue a close back from the channel
-      // and we want to ensure a stop here just in case
-      ReplicationManager replicatorInUse = replicator;
-      if (replicatorInUse != null) {
-         if (sendFailover) {
-            final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
-            if (token != null) {
-               try {
-                  token.waitCompletion(5000);
-               } catch (Exception e) {
-                  // ignore it
+      enterCritical(CRITICAL_STOP_2);
+      storageManagerLock.writeLock().lock();
+      try {
+
+         // We cache the variable as the replicator could be changed between here and the
time we call stop
+         // since sendLiveIsStopping may issue a close back from the channel
+         // and we want to ensure a stop here just in case
+         ReplicationManager replicatorInUse = replicator;
+         if (replicatorInUse != null) {
+            if (sendFailover) {
+               final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
+               if (token != null) {
+                  try {
+                     token.waitCompletion(5000);
+                  } catch (Exception e) {
+                     // ignore it
+                  }
                }
             }
+            // we cannot clear replication tokens, otherwise clients will eventually be informed
of completion during a server's shutdown
+            // while the backup will never receive then
+            replicatorInUse.stop(false);
          }
-         replicatorInUse.stop();
-      }
-      bindingsJournal.stop();
+         bindingsJournal.stop();
 
-      messageJournal.stop();
+         messageJournal.stop();
 
-      journalLoaded = false;
+         journalLoaded = false;
 
-      started = false;
+         started = false;
+      } finally {
+         storageManagerLock.writeLock().unlock();
+         leaveCritical(CRITICAL_STOP_2);
+      }
+      return false;
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 15d5311..998bbcf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -150,6 +150,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       journals[id] = journal;
    }
 
+   /**
+    * This is for tests basically, do not use it as its API is not guaranteed for future
usage.
+    */
+   public void pause() {
+      started = false;
+   }
+
+   /**
+    * This is for tests basically, do not use it as its API is not guaranteed for future
usage.
+    */
+   public void resume() {
+      started = true;
+   }
+
    @Override
    public void handlePacket(final Packet packet) {
       if (logger.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index fbf7c6c..6973706 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -282,6 +282,10 @@ public final class ReplicationManager implements ActiveMQComponent {
 
    @Override
    public void stop() throws Exception {
+      stop(true);
+   }
+
+   public void stop(boolean clearTokens) throws Exception {
       synchronized (this) {
          if (!started) {
             logger.trace("Stopping being ignored as it hasn't been started");
@@ -297,7 +301,10 @@ public final class ReplicationManager implements ActiveMQComponent {
 
       enabled = false;
       writable.set(true);
-      clearReplicationTokens();
+
+      if (clearTokens) {
+         clearReplicationTokens();
+      }
 
       RemotingConnection toStop = remotingConnection;
       if (toStop != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java
new file mode 100644
index 0000000..64343e3
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyReplicationStopTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
+import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
+import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NettyReplicationStopTest extends FailoverTestBase {
+
+   @Override
+   protected TestableServer createTestableServer(Configuration config) {
+      return new SameProcessActiveMQServer(createServer(true, config));
+   }
+
+   @Override
+   protected void createConfigs() throws Exception {
+      createReplicatedConfigs();
+   }
+
+   @Override
+   protected NodeManager createNodeManager() throws Exception {
+      return new InVMNodeManager(false);
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
+      return getNettyAcceptorTransportConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
+      return getNettyConnectorTransportConfiguration(live);
+   }
+
+   @Override
+   protected final void crash(boolean waitFailure, ClientSession... sessions) throws Exception
{
+      if (sessions.length > 0) {
+         for (ClientSession session : sessions) {
+            waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
+         }
+      } else {
+         waitForRemoteBackup(null, 5, true, backupServer.getServer());
+      }
+      super.crash(waitFailure, sessions);
+   }
+
+   @Override
+   protected final void crash(ClientSession... sessions) throws Exception {
+      crash(true, sessions);
+   }
+
+   @Test
+   public void testReplicaStop() throws Exception {
+
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(15);
+
+      final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+
+      ClientSession session = sf.createSession(true, true);
+
+      session.createQueue(ADDRESS, ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 10;
+
+      ReplicationEndpoint endpoint = backupServer.getServer().getReplicationEndpoint();
+
+      endpoint.pause();
+
+      ArrayList<Thread> threads = new ArrayList<>();
+      final ArrayList<Integer> codesSent = new ArrayList<>();
+
+      CountDownLatch alignedOnSend = new CountDownLatch(10);
+
+      for (int i = 0; i < numMessages; i++) {
+         final int code = i;
+         Thread t = new Thread("WillSend " + code) {
+            @Override
+            public void run() {
+               try {
+                  ClientSession session = sf.createSession(true, true);
+
+                  ClientProducer producer = session.createProducer(ADDRESS);
+
+                  ClientMessage message = session.createMessage(true).putIntProperty("i",
code);
+                  alignedOnSend.countDown();
+                  System.out.println("blocking!!");
+                  producer.send(message);
+                  codesSent.add(code);
+
+                  System.out.println("Sent!");
+
+               } catch (Exception e) {
+                  // that's ok;
+                  e.printStackTrace(); // logging just for debug & reference
+               }
+            }
+         };
+
+         t.start();
+
+         threads.add(t);
+      }
+
+      Assert.assertTrue(alignedOnSend.await(10, TimeUnit.SECONDS));
+      liveServer.stop();
+
+      Assert.assertEquals(0, codesSent.size());
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e617f322/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
index 04fd1a9..d0d8ce3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -365,6 +366,105 @@ public class BasicXaTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testPrepareError() throws Exception {
+      Xid xid = newXID();
+
+      ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
+      ClientProducer clientProducer = clientSession2.createProducer(atestq);
+      ClientMessage m1 = createTextMessage(clientSession2, "m1");
+      ClientMessage m2 = createTextMessage(clientSession2, "m2");
+      ClientMessage m3 = createTextMessage(clientSession2, "m3");
+      ClientMessage m4 = createTextMessage(clientSession2, "m4");
+      clientProducer.send(m1);
+      clientProducer.send(m2);
+      clientProducer.send(m3);
+      clientProducer.send(m4);
+
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
+      ClientMessage m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+      clientSession.end(xid, XAResource.TMSUCCESS);
+
+      StorageManager journalStorageManager = messagingService.getStorageManager();
+
+      clientSession.prepare(xid);
+
+      journalStorageManager.getMessageJournal().stop();
+      try {
+         clientSession.commit(xid, false);
+         Assert.fail("Exception exptected");
+      } catch (XAException e) {
+         Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
+      }
+   }
+
+
+   @Test
+   public void testRollbackError() throws Exception {
+      Xid xid = newXID();
+
+      ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
+      ClientProducer clientProducer = clientSession2.createProducer(atestq);
+      ClientMessage m1 = createTextMessage(clientSession2, "m1");
+      ClientMessage m2 = createTextMessage(clientSession2, "m2");
+      ClientMessage m3 = createTextMessage(clientSession2, "m3");
+      ClientMessage m4 = createTextMessage(clientSession2, "m4");
+      clientProducer.send(m1);
+      clientProducer.send(m2);
+      clientProducer.send(m3);
+      clientProducer.send(m4);
+
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
+      ClientMessage m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
+      m = clientConsumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
+      clientSession.end(xid, XAResource.TMSUCCESS);
+
+      StorageManager journalStorageManager = messagingService.getStorageManager();
+
+      clientSession.prepare(xid);
+
+      journalStorageManager.getMessageJournal().stop();
+      try {
+         clientSession.rollback(xid);
+         Assert.fail("Exception exptected");
+      } catch (XAException e) {
+         Assert.assertTrue(e.errorCode == XAException.XA_RETRY);
+      }
+   }
+
+   @Test
    public void testReceiveRollback() throws Exception {
       int numSessions = 100;
       ClientSession clientSession2 = sessionFactory.createSession(false, true, true);


Mime
View raw message