activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [01/44] activemq-artemis git commit: ARTEMIS-350 fix for potential race [Forced Update!]
Date Tue, 23 Feb 2016 19:38:57 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/refactor-openwire 040706357 -> 0e545541b (forced update)


ARTEMIS-350 fix for potential race

It's possible for the latch used for flow control here to get out of sync. In
other words, multiple count-downs can occur between count-ups so that the latch
always has a count > 0. When this situation arises then every single packet
sent to the replica is delayed by 5 seconds.

The solution here essentially is to eliminate the latch completely and use a
condition/wait/notify pattern.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2fcd474b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2fcd474b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2fcd474b

Branch: refs/heads/refactor-openwire
Commit: 2fcd474bb3ea83c696e2794cd7b5951494a2b20a
Parents: 2a639cb
Author: jbertram <jbertram@apache.org>
Authored: Tue Feb 16 10:45:25 2016 -0600
Committer: jbertram <jbertram@apache.org>
Committed: Tue Feb 23 12:56:45 2016 -0600

----------------------------------------------------------------------
 .../impl/journal/JournalStorageManager.java     |  9 ++++++--
 .../core/replication/ReplicationManager.java    | 22 ++++++++++++++------
 .../core/server/ActiveMQMessageBundle.java      |  3 +++
 3 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2fcd474b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index c1ef0d3..8807f4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -584,7 +584,12 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
          SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName);
          if (!seqFile.exists())
             continue;
-         replicator.syncLargeMessageFile(seqFile, size, id);
+         if (replicator != null) {
+            replicator.syncLargeMessageFile(seqFile, size, id);
+         }
+         else {
+            throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull();
+         }
       }
    }
 
@@ -711,4 +716,4 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
          readUnLock();
       }
    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2fcd474b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index e84257f..8f0774e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -110,9 +110,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
 
    private volatile boolean enabled;
 
+   private final AtomicBoolean writable = new AtomicBoolean(false);
+
    private final Object replicationLock = new Object();
 
-   private final ReusableLatch latch = new ReusableLatch();
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
 
    private final ExecutorFactory executorFactory;
@@ -271,10 +272,11 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
       if (replicatingChannel != null) {
          replicatingChannel.close();
          replicatingChannel.getConnection().getTransportConnection().fireReady(true);
-         latch.setCount(0);
       }
 
       synchronized (replicationLock) {
+         writable.set(true);
+         replicationLock.notifyAll();
          clearReplicationTokens();
       }
 
@@ -342,10 +344,15 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
          if (enabled) {
             pendingTokens.add(repliToken);
             if (!replicatingChannel.getConnection().isWritable(this)) {
-               latch.countUp();
                try {
                   //don't wait for ever as this may hang tests etc, we've probably been closed
anyway
-                  latch.await(5, TimeUnit.SECONDS);
+                  long now = System.currentTimeMillis();
+                  long deadline = now + 5000;
+                  while (!writable.get() && now < deadline)  {
+                     replicationLock.wait(deadline - now);
+                     now = System.currentTimeMillis();
+                  }
+                  writable.set(false);
                }
                catch (InterruptedException e) {
                   throw new ActiveMQInterruptedException(e);
@@ -370,7 +377,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
 
    @Override
    public void readyForWriting() {
-      latch.countDown();
+      synchronized (replicationLock) {
+         writable.set(true);
+         replicationLock.notifyAll();
+      }
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2fcd474b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index da1aee4..4de94b9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -368,4 +368,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 119116, value = "Netty Acceptor unavailable", format = Message.Format.MESSAGE_FORMAT)
    IllegalStateException acceptorUnavailable();
+
+   @Message(id = 119117, value = "Replicator is null. Replication was likely terminated.")
+   ActiveMQIllegalStateException replicatorIsNull();
 }


Mime
View raw message