activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-616 Use Call timeout on replication flow control
Date Thu, 07 Jul 2016 14:58:41 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 0ab88e609 -> db578d37a


ARTEMIS-616 Use Call timeout on replication flow control


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/246d11c6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/246d11c6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/246d11c6

Branch: refs/heads/master
Commit: 246d11c6b19b8210f6da96cd6f71ad64b0862d50
Parents: 0ab88e6
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Wed Jul 6 17:35:23 2016 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jul 6 19:12:25 2016 -0400

----------------------------------------------------------------------
 .../activemq/artemis/core/replication/ReplicationManager.java | 7 +++++--
 .../artemis/core/server/cluster/ClusterConnection.java        | 4 ++++
 .../core/server/cluster/impl/ClusterConnectionImpl.java       | 5 +++++
 .../artemis/core/server/impl/SharedNothingLiveActivation.java | 2 +-
 .../tests/integration/replication/ReplicationTest.java        | 2 +-
 5 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 58102d4..b254d9a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -126,6 +126,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
 
    private CoreRemotingConnection remotingConnection;
 
+   private final long timeout;
+
    private volatile boolean inSync = true;
 
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@@ -133,10 +135,11 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
    /**
     * @param remotingConnection
     */
-   public ReplicationManager(CoreRemotingConnection remotingConnection, final ExecutorFactory
executorFactory) {
+   public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout,
final ExecutorFactory executorFactory) {
       this.executorFactory = executorFactory;
       this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
       this.remotingConnection = remotingConnection;
+      this.timeout = timeout;
    }
 
    public void appendUpdateRecord(final byte journalID,
@@ -384,7 +387,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
             writable.set(false);
             //don't wait for ever as this may hang tests etc, we've probably been closed
anyway
             long now = System.currentTimeMillis();
-            long deadline = now + 5000;
+            long deadline = now + timeout;
             while (!writable.get() && now < deadline)  {
                replicationLock.wait(deadline - now);
                now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
index 7134bd3..c47ff48 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
@@ -77,4 +77,8 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
    void removeRecord(String targetNodeID);
 
    void disconnectRecord(String targetNodeID);
+
+   long getCallTimeout();
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 800ed5a..77f25e9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -554,6 +554,11 @@ public final class ClusterConnectionImpl implements ClusterConnection,
AfterConn
    }
 
    @Override
+   public long getCallTimeout() {
+      return callTimeout;
+   }
+
+   @Override
    public Map<String, String> getNodes() {
       synchronized (recordsGuard) {
          Map<String, String> nodes = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 6b222fb..f17bcc5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -162,7 +162,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
          ReplicationFailureListener listener = new ReplicationFailureListener();
          rc.addCloseListener(listener);
          rc.addFailureListener(listener);
-         replicationManager = new ReplicationManager(rc, activeMQServer.getExecutorFactory());
+         replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(),
activeMQServer.getExecutorFactory());
          replicationManager.start();
          Thread t = new Thread(new Runnable() {
             @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/246d11c6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 4a00caa..6ff0cf0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       setupServer(false);
       try {
          ClientSessionFactory sf = createSessionFactory(locator);
-         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), factory);
+         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(),
factory);
          addActiveMQComponent(manager);
          manager.start();
          Assert.fail("Exception was expected");


Mime
View raw message