activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-552 Replication target being finished can lead to instability on live
Date Mon, 06 Jun 2016 21:41:29 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master ee6176c67 -> 50d83fb63


ARTEMIS-552 Replication target being finished can lead to instability on live

https://issues.apache.org/jira/browse/ARTEMIS-552


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e658654
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e658654
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e658654

Branch: refs/heads/master
Commit: 2e6586548b3a0d69f1ce2079a2da243af16a28c6
Parents: ee6176c
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Jun 1 15:34:57 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Jun 6 16:28:51 2016 -0400

----------------------------------------------------------------------
 .../core/persistence/OperationContext.java      | 12 +++
 .../core/persistence/StorageManager.java        |  2 +
 .../journal/AbstractJournalStorageManager.java  |  9 +++
 .../impl/journal/DummyOperationContext.java     |  7 ++
 .../impl/journal/JournalStorageManager.java     |  4 +
 .../impl/journal/OperationContextImpl.java      | 46 ++++++++++--
 .../impl/nullpm/NullStorageManager.java         | 10 +++
 .../postoffice/impl/DuplicateIDCacheImpl.java   |  2 +-
 .../server/impl/RemotingServiceImpl.java        | 12 ++-
 .../core/replication/ReplicationManager.java    | 72 +++++++++++++-----
 .../core/server/ActiveMQServerLogger.java       |  5 ++
 .../artemis/core/transaction/Transaction.java   |  5 ++
 .../core/transaction/impl/TransactionImpl.java  | 78 ++++++++++++++++++--
 .../transaction/impl/TransactionImplTest.java   |  5 ++
 .../extras/byteman/OrphanedConsumerTest.java    |  4 -
 .../tests/integration/client/PagingTest.java    |  5 ++
 .../persistence/DuplicateCacheTest.java         |  2 +-
 .../core/postoffice/impl/BindingsImplTest.java  |  5 ++
 18 files changed, 246 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
index 6d64eb8..e893a10 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
@@ -29,9 +29,21 @@ public interface OperationContext extends IOCompletion {
    /**
     * Execute the task when all IO operations are complete,
     * Or execute it immediately if nothing is pending.
+    * @param runnable the tas to be executed.
+    * @param storeOnly There are tasks that won't need to wait on replication or paging and
will need to
+    *                  be completed as soon as the response from the journal is received.
An example would be the
+    *                  DuplicateCache
+    */
+   void executeOnCompletion(IOCallback runnable, boolean storeOnly);
+
+   /**
+    * Execute the task when all IO operations are complete,
+    * Or execute it immediately if nothing is pending.
+    * @param runnable the tas to be executed.
     */
    void executeOnCompletion(IOCallback runnable);
 
+
    void replicationLineUp();
 
    void replicationDone();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index a0a5200..f92d0d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -100,6 +100,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent
{
 
    void afterCompleteOperations(IOCallback run);
 
+   /** This is similar to afterComplete, however this only cares about the journal part.
*/
+   void afterStoreOperations(IOCallback run);
    /**
     * Block until the operations are done.
     * Warning: Don't use it inside an ordered executor, otherwise the system may lock up

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index ff21fe2..ed2e1f4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -292,6 +292,10 @@ public abstract class AbstractJournalStorageManager implements StorageManager
{
       getContext().executeOnCompletion(run);
    }
 
+   public void afterStoreOperations(IOCallback run) {
+      getContext().executeOnCompletion(run, true);
+   }
+
    @Override
    public long generateID() {
       return idGenerator.generateID();
@@ -1789,6 +1793,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager
{
       }
 
       @Override
+      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+         executeOnCompletion(runnable);
+      }
+
+      @Override
       public void replicationDone() {
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
index 1ae7524..6fd95ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
@@ -35,6 +35,13 @@ final class DummyOperationContext implements OperationContext {
    }
 
    @Override
+   public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+      // There are no executeOnCompletion calls while using the DummyOperationContext
+      // However we keep the code here for correctness
+      runnable.done();
+   }
+
+   @Override
    public void replicationDone() {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 1379308..157306e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -61,8 +61,10 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.jboss.logging.Logger;
 
 public class JournalStorageManager extends AbstractJournalStorageManager {
+   private static final Logger logger = Logger.getLogger(JournalStorageManager.class);
 
    private SequentialFileFactory journalFF;
 
@@ -569,6 +571,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
          }
       }
       catch (Exception e) {
+         logger.warn(e.getMessage(), e);
          stopReplication();
          throw e;
       }
@@ -681,6 +684,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
     */
    @Override
    public void stopReplication() {
+      logger.trace("stopReplication()");
       storageManagerLock.writeLock().lock();
       try {
          if (replicator == null)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index acd75b1..06e07f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -72,6 +72,7 @@ public class OperationContextImpl implements OperationContext {
    }
 
    private List<TaskHolder> tasks;
+   private List<TaskHolder> storeOnlyTasks;
 
    private long minimalStore = Long.MAX_VALUE;
    private long minimalReplicated = Long.MAX_VALUE;
@@ -126,7 +127,12 @@ public class OperationContextImpl implements OperationContext {
    }
 
    @Override
-   public void executeOnCompletion(final IOCallback completion) {
+   public void executeOnCompletion(IOCallback runnable) {
+      executeOnCompletion(runnable, false);
+   }
+
+   @Override
+   public void executeOnCompletion(final IOCallback completion, final boolean storeOnly)
{
       if (errorCode != -1) {
          completion.onError(errorCode, errorMessage);
          return;
@@ -135,11 +141,18 @@ public class OperationContextImpl implements OperationContext {
       boolean executeNow = false;
 
       synchronized (this) {
-         if (tasks == null) {
-            tasks = new LinkedList<>();
-            minimalReplicated = replicationLineUp.intValue();
-            minimalStore = storeLineUp.intValue();
-            minimalPage = pageLineUp.intValue();
+         if (storeOnly) {
+            if (storeOnlyTasks == null) {
+               storeOnlyTasks = new LinkedList<>();
+            }
+         }
+         else {
+            if (tasks == null) {
+               tasks = new LinkedList<>();
+               minimalReplicated = replicationLineUp.intValue();
+               minimalStore = storeLineUp.intValue();
+               minimalPage = pageLineUp.intValue();
+            }
          }
 
          // On this case, we can just execute the context directly
@@ -159,7 +172,12 @@ public class OperationContextImpl implements OperationContext {
             }
          }
          else {
-            tasks.add(new TaskHolder(completion));
+            if (storeOnly) {
+               storeOnlyTasks.add(new TaskHolder(completion));
+            }
+            else {
+               tasks.add(new TaskHolder(completion));
+            }
          }
       }
 
@@ -177,6 +195,20 @@ public class OperationContextImpl implements OperationContext {
    }
 
    private void checkTasks() {
+
+      if (storeOnlyTasks != null) {
+         Iterator<TaskHolder> iter = storeOnlyTasks.iterator();
+         while (iter.hasNext()) {
+            TaskHolder holder = iter.next();
+            if (stored >= holder.storeLined) {
+               // If set, we use an executor to avoid the server being single threaded
+               execute(holder.task);
+
+               iter.remove();
+            }
+         }
+      }
+
       if (stored >= minimalStore && replicated >= minimalReplicated &&
paged >= minimalPage) {
          Iterator<TaskHolder> iter = tasks.iterator();
          while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 39c5de5..21a9fd9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -94,6 +94,11 @@ public class NullStorageManager implements StorageManager {
       }
 
       @Override
+      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+         runnable.done();
+      }
+
+      @Override
       public void storeLineUp() {
       }
 
@@ -339,6 +344,11 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
+   public void afterStoreOperations(IOCallback run) {
+      run.done();
+   }
+
+   @Override
    public void waitOnOperations() throws Exception {
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
index 7f35638..28896c3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -226,7 +226,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
             }
             // For a tx, it's important that the entry is not added to the cache until commit
             // since if the client fails then resends them tx we don't want it to get rejected
-            tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
+            tx.afterStore(new AddDuplicateIDOperation(duplID, recordID));
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 3672fe2..3a073e9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -690,9 +690,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
                }
 
                for (Object id : idsToRemove) {
-                  RemotingConnection conn = getConnection(id);
+                  final RemotingConnection conn = getConnection(id);
                   if (conn != null) {
-                     conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
+                     // In certain cases (replicationManager for instance) calling fail could
take some time
+                     // We can't pause the FailureCheckAndFlushThread as that would lead
other clients to fail for
+                     // missing pings
+                     flushExecutor.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                           conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
+                        }
+                     });
                      removeConnection(id);
                   }
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 1abd9c6..58102d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -265,13 +265,14 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
    }
 
    @Override
-   public synchronized void stop() throws Exception {
-      if (!started) {
-         return;
+   public void stop() throws Exception {
+      synchronized (this) {
+         if (!started) {
+            logger.trace("Stopping being ignored as it hasn't been started");
+            return;
+         }
       }
 
-      enabled = false;
-
       // This is to avoid the write holding a lock while we are trying to close it
       if (replicatingChannel != null) {
          replicatingChannel.close();
@@ -279,6 +280,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       }
 
       synchronized (replicationLock) {
+         enabled = false;
          writable.set(true);
          replicationLock.notifyAll();
          clearReplicationTokens();
@@ -299,9 +301,12 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
     * backup crashing).
     */
    public void clearReplicationTokens() {
+      logger.trace("clearReplicationTokens initiating");
       synchronized (replicationLock) {
+         logger.trace("clearReplicationTokens entered the lock");
          while (!pendingTokens.isEmpty()) {
             OperationContext ctx = pendingTokens.poll();
+            logger.trace("Calling ctx.replicationDone()");
             try {
                ctx.replicationDone();
             }
@@ -310,6 +315,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
             }
          }
       }
+      logger.trace("clearReplicationTokens finished");
    }
 
    /**
@@ -347,20 +353,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       synchronized (replicationLock) {
          if (enabled) {
             pendingTokens.add(repliToken);
-            if (!replicatingChannel.getConnection().isWritable(this)) {
-               try {
-                  writable.set(false);
-                  //don't wait for ever as this may hang tests etc, we've probably been closed
anyway
-                  long now = System.currentTimeMillis();
-                  long deadline = now + 5000;
-                  while (!writable.get() && now < deadline)  {
-                     replicationLock.wait(deadline - now);
-                     now = System.currentTimeMillis();
-                  }
-               }
-               catch (InterruptedException e) {
-                  throw new ActiveMQInterruptedException(e);
-               }
+            if (!flowControl()) {
+               return repliToken;
             }
             replicatingChannel.send(packet);
          }
@@ -379,6 +373,43 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       return repliToken;
    }
 
+   /** This was written as a refactoring of sendReplicatePacket.
+    *  In case you refactor this in any way, this method must hold a lock on replication
lock. .*/
+   private boolean flowControl() {
+      // synchronized (replicationLock) { -- I'm not adding this because the caller already
has it
+      // future maintainers of this code please be aware that the intention here is hold
the lock on replication lock
+      if (!replicatingChannel.getConnection().isWritable(this)) {
+         try {
+            logger.trace("flowControl waiting on writable");
+            writable.set(false);
+            //don't wait for ever as this may hang tests etc, we've probably been closed
anyway
+            long now = System.currentTimeMillis();
+            long deadline = now + 5000;
+            while (!writable.get() && now < deadline)  {
+               replicationLock.wait(deadline - now);
+               now = System.currentTimeMillis();
+            }
+            logger.trace("flow control done");
+
+            if (!writable.get()) {
+               ActiveMQServerLogger.LOGGER.slowReplicationResponse();
+               logger.tracef("There was no response from replication backup after %s seconds,
server being stopped now", System.currentTimeMillis() - now);
+               try {
+                  stop();
+               }
+               catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+               return false;
+            }
+         }
+         catch (InterruptedException e) {
+            throw new ActiveMQInterruptedException(e);
+         }
+      }
+      return true;
+   }
+
    @Override
    public void readyForWriting() {
       synchronized (replicationLock) {
@@ -591,6 +622,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
          sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
          try {
             if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout))
{
+               logger.trace("sendSynchronizationDone wasn't finished in time");
                throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
             }
          }
@@ -598,6 +630,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
             logger.debug(e);
          }
          inSync = false;
+
+         logger.trace("sendSynchronizationDone finished");
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6679008..3428a2f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1209,6 +1209,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 222206, value = "Connection limit of {0} reached. Refusing connection from
{1}.", format = Message.Format.MESSAGE_FORMAT)
    void connectionLimitReached(long connectionsAllowed, String address);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222207, value = "The backup server is not responding promptly introducing
latency beyond the limit. Replication server being disconnected now.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void slowReplicationResponse();
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index da87cbf..33c1eea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -67,6 +67,11 @@ public interface Transaction {
 
    void addOperation(TransactionOperation sync);
 
+   /** This is an operation that will be called right after the storage is completed.
+    *  addOperation could only happen after paging and replication, while these operations
will just be
+    *  about the storage*/
+   void afterStore(TransactionOperation sync);
+
    List<TransactionOperation> getAllOperations();
 
    boolean hasTimedOut(long currentTime, int defaultTimeout);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 0a91562..185bfb2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.transaction.impl;
 import javax.transaction.xa.Xid;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -38,6 +39,8 @@ public class TransactionImpl implements Transaction {
 
    private List<TransactionOperation> operations;
 
+   private List<TransactionOperation> storeOperations;
+
    private static final int INITIAL_NUM_PROPERTIES = 10;
 
    private Object[] properties = new Object[TransactionImpl.INITIAL_NUM_PROPERTIES];
@@ -301,6 +304,24 @@ public class TransactionImpl implements Transaction {
             }
          });
 
+         final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
+         this.storeOperations = null;
+
+         if (storeOperationsToComplete != null) {
+            storageManager.afterStoreOperations(new IOCallback() {
+
+               @Override
+               public void onError(final int errorCode, final String errorMessage) {
+                  ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
+               }
+
+               @Override
+               public void done() {
+                  afterCommit(storeOperationsToComplete);
+               }
+            });
+         }
+
       }
    }
 
@@ -365,6 +386,9 @@ public class TransactionImpl implements Transaction {
       final List<TransactionOperation> operationsToComplete = this.operations;
       this.operations = null;
 
+      final List<TransactionOperation> storeOperationsToComplete = this.storeOperations;
+      this.storeOperations = null;
+
       // We use the Callback even for non persistence
       // If we are using non-persistence with replication, the replication manager will have
       // to execute this runnable in the correct order
@@ -380,6 +404,21 @@ public class TransactionImpl implements Transaction {
             afterRollback(operationsToComplete);
          }
       });
+
+      if (storeOperationsToComplete != null) {
+         storageManager.afterStoreOperations(new IOCallback() {
+
+            @Override
+            public void onError(final int errorCode, final String errorMessage) {
+               ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
+            }
+
+            @Override
+            public void done() {
+               afterRollback(storeOperationsToComplete);
+            }
+         });
+      }
    }
 
    @Override
@@ -445,6 +484,15 @@ public class TransactionImpl implements Transaction {
       operations.add(operation);
    }
 
+
+   @Override
+   public synchronized void afterStore(TransactionOperation sync) {
+      if (storeOperations == null) {
+         storeOperations = new LinkedList<>();
+      }
+      storeOperations.add(sync);
+   }
+
    private int getOperationsCount() {
       checkCreateOperations();
 
@@ -491,7 +539,7 @@ public class TransactionImpl implements Transaction {
 
    private void checkCreateOperations() {
       if (operations == null) {
-         operations = new ArrayList<>();
+         operations = new LinkedList<>();
       }
    }
 
@@ -505,13 +553,13 @@ public class TransactionImpl implements Transaction {
       }
    }
 
-   private synchronized void afterRollback(List<TransactionOperation> oeprationsToComplete)
{
-      if (oeprationsToComplete != null) {
-         for (TransactionOperation operation : oeprationsToComplete) {
+   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete)
{
+      if (operationsToComplete != null) {
+         for (TransactionOperation operation : operationsToComplete) {
             operation.afterRollback(this);
          }
          // Help out GC here
-         oeprationsToComplete.clear();
+         operationsToComplete.clear();
       }
    }
 
@@ -521,6 +569,11 @@ public class TransactionImpl implements Transaction {
             operation.beforeCommit(this);
          }
       }
+      if (storeOperations != null) {
+         for (TransactionOperation operation : storeOperations) {
+            operation.beforeCommit(this);
+         }
+      }
    }
 
    private synchronized void beforePrepare() throws Exception {
@@ -529,6 +582,11 @@ public class TransactionImpl implements Transaction {
             operation.beforePrepare(this);
          }
       }
+      if (storeOperations != null) {
+         for (TransactionOperation operation : storeOperations) {
+            operation.beforePrepare(this);
+         }
+      }
    }
 
    private synchronized void beforeRollback() throws Exception {
@@ -537,6 +595,11 @@ public class TransactionImpl implements Transaction {
             operation.beforeRollback(this);
          }
       }
+      if (storeOperations != null) {
+         for (TransactionOperation operation : storeOperations) {
+            operation.beforeRollback(this);
+         }
+      }
    }
 
    private synchronized void afterPrepare() {
@@ -545,6 +608,11 @@ public class TransactionImpl implements Transaction {
             operation.afterPrepare(this);
          }
       }
+      if (storeOperations != null) {
+         for (TransactionOperation operation : storeOperations) {
+            operation.afterPrepare(this);
+         }
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 6c5cfe5..9a66610 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -253,6 +253,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void afterStoreOperations(IOCallback run) {
+         run.done();
+      }
+
+      @Override
       public boolean waitOnOperations(long timeout) throws Exception {
          return false;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
index 211aee5..a95cbaa 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java
@@ -67,10 +67,6 @@ public class OrphanedConsumerTest extends ActiveMQTestBase {
     * This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()}
     */
    public static void leavingCloseOnTestCountersWhileClosing() {
-      if (staticServer.getConnectionCount() == 0) {
-         verification = new AssertionError("The connection was closed before the consumers
and sessions, this may cause issues on management leaving Orphaned Consumers!");
-      }
-
       if (staticServer.getSessions().size() == 0) {
          verification = new AssertionError("The session was closed before the consumers,
this may cause issues on management leaving Orphaned Consumers!");
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
index f658fae..41467b5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
@@ -5730,7 +5730,12 @@ public class PagingTest extends ActiveMQTestBase {
 
       @Override
       public void executeOnCompletion(IOCallback runnable) {
+         runnable.done();
+      }
 
+      @Override
+      public void executeOnCompletion(IOCallback runnable, boolean storeOnly) {
+         runnable.done();
       }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
index 38be202..299022c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java
@@ -90,7 +90,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase {
          public void onError(int errorCode, String errorMessage) {
 
          }
-      });
+      }, true);
 
 
       Assert.assertTrue(latch.await(1, TimeUnit.MINUTES));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 805a6f5..44b5d82 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -115,6 +115,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void afterStore(TransactionOperation sync) {
+
+      }
+
+      @Override
       public void addOperation(final TransactionOperation sync) {
 
       }


Mime
View raw message