activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-866 - replication improvements
Date Tue, 07 Feb 2017 13:49:21 GMT
ARTEMIS-866 - replication improvements

add functionality to allow live to vote for quorum on failure

Also allow the quorum size to be configurable.

https://issues.apache.org/jira/browse/ARTEMIS-866


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/43a92764
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/43a92764
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/43a92764

Branch: refs/heads/master
Commit: 43a92764846d47b32065704c88b925bcc8d72f70
Parents: 7e5ada8
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Mon Feb 6 16:01:09 2017 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Tue Feb 7 13:47:03 2017 +0000

----------------------------------------------------------------------
 .../config/ActiveMQDefaultConfiguration.java    |  11 ++
 .../artemis/core/config/ConfigurationUtils.java |   4 +-
 .../config/ha/ReplicaPolicyConfiguration.java   |  20 +++
 .../ha/ReplicatedPolicyConfiguration.java       |  20 +++
 .../deployers/impl/FileConfigurationParser.java |   8 +
 .../core/server/cluster/ha/ReplicaPolicy.java   |  34 +++-
 .../server/cluster/ha/ReplicatedPolicy.java     |  39 ++++-
 .../qourum/SharedNothingBackupQuorum.java       |   8 +-
 .../impl/SharedNothingBackupActivation.java     |   2 +-
 .../impl/SharedNothingLiveActivation.java       |  47 +++++-
 .../resources/schema/artemis-configuration.xsd  |  30 ++++
 docs/user-manual/en/network-isolation.md        |  59 ++++++-
 .../failover/ReplicatedVotingFailoverTest.java  | 162 +++++++++++++++++++
 13 files changed, 427 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 8fce7ea..a26fd1d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -467,6 +467,10 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final String DEFAULT_INTERNAL_NAMING_PREFIX = "$.artemis.internal.";
 
+   public static boolean DEFAULT_VOTE_ON_REPLICATION_FAILURE = false;
+
+   public static int DEFAULT_QUORUM_SIZE = -1;
+
    /**
     * If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that
are in available on the classpath. If false then only the core protocol will be available,
unless in Embedded mode where users can inject their own Protocol Managers.
     */
@@ -1260,4 +1264,11 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_NETWORK_CHECK_NIC;
    }
 
+   public static boolean getDefaultVoteOnReplicationFailure() {
+      return DEFAULT_VOTE_ON_REPLICATION_FAILURE;
+   }
+
+   public static int getDefaultQuorumSize() {
+      return DEFAULT_QUORUM_SIZE;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
index beeb8da..eefd9b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
@@ -65,11 +65,11 @@ public final class ConfigurationUtils {
          }
          case REPLICATED: {
             ReplicatedPolicyConfiguration pc = (ReplicatedPolicyConfiguration) conf;
-            return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(),
pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck());
+            return new ReplicatedPolicy(pc.isCheckForLiveServer(), pc.getGroupName(), pc.getClusterName(),
pc.getInitialReplicationSyncTimeout(), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(),
pc.getQuorumSize());
          }
          case REPLICA: {
             ReplicaPolicyConfiguration pc = (ReplicaPolicyConfiguration) conf;
-            return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(),
pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(),
getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck());
+            return new ReplicaPolicy(pc.getClusterName(), pc.getMaxSavedReplicatedJournalsSize(),
pc.getGroupName(), pc.isRestartBackup(), pc.isAllowFailBack(), pc.getInitialReplicationSyncTimeout(),
getScaleDownPolicy(pc.getScaleDownConfiguration()), server.getNetworkHealthCheck(), pc.getVoteOnReplicationFailure(),
pc.getQuorumSize());
          }
          case SHARED_STORE_MASTER: {
             SharedStoreMasterPolicyConfiguration pc = (SharedStoreMasterPolicyConfiguration)
conf;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
index 17c83d4..0b50882 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicaPolicyConfiguration.java
@@ -39,6 +39,10 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration
{
 
    private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
 
+   private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure();
+
+   private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();
+
    public ReplicaPolicyConfiguration() {
    }
 
@@ -119,4 +123,20 @@ public class ReplicaPolicyConfiguration implements HAPolicyConfiguration
{
       this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       return this;
    }
+
+   public boolean getVoteOnReplicationFailure() {
+      return voteOnReplicationFailure;
+   }
+
+   public void setVoteOnReplicationFailure(Boolean voteOnReplicationFailure) {
+      this.voteOnReplicationFailure = voteOnReplicationFailure;
+   }
+
+   public int getQuorumSize() {
+      return quorumSize;
+   }
+
+   public void setQuorumSize(int quorumSize) {
+      this.quorumSize = quorumSize;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
index 3b84bb7..9072822 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ha/ReplicatedPolicyConfiguration.java
@@ -29,6 +29,10 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration
{
 
    private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
 
+   private boolean voteOnReplicationFailure = ActiveMQDefaultConfiguration.getDefaultVoteOnReplicationFailure();
+
+   private int quorumSize = ActiveMQDefaultConfiguration.getDefaultQuorumSize();
+
    public ReplicatedPolicyConfiguration() {
    }
 
@@ -71,4 +75,20 @@ public class ReplicatedPolicyConfiguration implements HAPolicyConfiguration
{
    public void setInitialReplicationSyncTimeout(long initialReplicationSyncTimeout) {
       this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
    }
+
+   public boolean getVoteOnReplicationFailure() {
+      return voteOnReplicationFailure;
+   }
+
+   public void setVoteOnReplicationFailure(boolean voteOnReplicationFailure) {
+      this.voteOnReplicationFailure = voteOnReplicationFailure;
+   }
+
+   public int getQuorumSize() {
+      return quorumSize;
+   }
+
+   public void setQuorumSize(int quorumSize) {
+      this.quorumSize = quorumSize;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6211a1b..2e40e6a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1146,6 +1146,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
{
 
       configuration.setInitialReplicationSyncTimeout(getLong(policyNode, "initial-replication-sync-timeout",
configuration.getInitialReplicationSyncTimeout(), Validators.GT_ZERO));
 
+      configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure",
configuration.getVoteOnReplicationFailure()));
+
+      configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(),
Validators.MINUS_ONE_OR_GT_ZERO));
+
       return configuration;
    }
 
@@ -1166,6 +1170,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
{
 
       configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode));
 
+      configuration.setVoteOnReplicationFailure(getBoolean(policyNode, "vote-on-replication-failure",
configuration.getVoteOnReplicationFailure()));
+
+      configuration.setQuorumSize(getInteger(policyNode, "quorum-size", configuration.getQuorumSize(),
Validators.MINUS_ONE_OR_GT_ZERO));
+
       return configuration;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
index 68db06e..2339610 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java
@@ -39,6 +39,16 @@ public class ReplicaPolicy extends BackupPolicy {
 
    private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();
 
+   /*
+   * what quorum size to use for voting
+   * */
+   private int quorumSize;
+
+   /*
+   * whether or not this live broker should vote to remain live
+   * */
+   private boolean voteOnReplicationFailure;
+
    private ReplicatedPolicy replicatedPolicy;
 
    private final NetworkHealthCheck networkHealthCheck;
@@ -60,15 +70,19 @@ public class ReplicaPolicy extends BackupPolicy {
                         boolean allowFailback,
                         long initialReplicationSyncTimeout,
                         ScaleDownPolicy scaleDownPolicy,
-                        NetworkHealthCheck networkHealthCheck) {
+                        NetworkHealthCheck networkHealthCheck,
+                        boolean voteOnReplicationFailure,
+                        int quorumSize) {
       this.clusterName = clusterName;
       this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize;
       this.groupName = groupName;
       this.restartBackup = restartBackup;
       this.allowFailback = allowFailback;
       this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
+      this.quorumSize = quorumSize;
       this.scaleDownPolicy = scaleDownPolicy;
       this.networkHealthCheck = networkHealthCheck;
+      this.voteOnReplicationFailure = voteOnReplicationFailure;
    }
 
    public ReplicaPolicy(String clusterName,
@@ -101,7 +115,7 @@ public class ReplicaPolicy extends BackupPolicy {
 
    public ReplicatedPolicy getReplicatedPolicy() {
       if (replicatedPolicy == null) {
-         replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout,
groupName, clusterName, this, networkHealthCheck);
+         replicatedPolicy = new ReplicatedPolicy(false, allowFailback, initialReplicationSyncTimeout,
groupName, clusterName, this, networkHealthCheck, voteOnReplicationFailure, quorumSize);
       }
       return replicatedPolicy;
    }
@@ -180,4 +194,20 @@ public class ReplicaPolicy extends BackupPolicy {
       backupActivation.init();
       return backupActivation;
    }
+
+   public void setQuorumSize(int quorumSize) {
+      this.quorumSize = quorumSize;
+   }
+
+   public int getQuorumSize() {
+      return quorumSize;
+   }
+
+   public void setVoteOnReplicationFailure(boolean voteOnReplicationFailure) {
+      this.voteOnReplicationFailure = voteOnReplicationFailure;
+   }
+
+   public boolean isVoteOnReplicationFailure() {
+      return voteOnReplicationFailure;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
index 82df79c..f8892af 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
@@ -41,6 +41,16 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation>
{
    private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();
 
    /*
+   * whether or not this live broker should vote to remain live
+   * */
+   private boolean voteOnReplicationFailure;
+
+   /*
+   * what quorum size to use for voting
+   * */
+   private int quorumSize;
+
+   /*
    * this are only used as the policy when the server is started as a live after a failover
    * */
    private ReplicaPolicy replicaPolicy;
@@ -56,15 +66,16 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation>
{
                            String groupName,
                            String clusterName,
                            long initialReplicationSyncTimeout,
-                           NetworkHealthCheck networkHealthCheck) {
+                           NetworkHealthCheck networkHealthCheck,
+                           boolean voteOnReplicationFailure,
+                           int quorumSize) {
       this.checkForLiveServer = checkForLiveServer;
       this.groupName = groupName;
       this.clusterName = clusterName;
       this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       this.networkHealthCheck = networkHealthCheck;
-      /*
-      * we create this with sensible defaults in case we start after a failover
-      * */
+      this.voteOnReplicationFailure = voteOnReplicationFailure;
+      this.quorumSize = quorumSize;
    }
 
    public ReplicatedPolicy(boolean checkForLiveServer,
@@ -73,7 +84,9 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation>
{
                            String groupName,
                            String clusterName,
                            ReplicaPolicy replicaPolicy,
-                           NetworkHealthCheck networkHealthCheck) {
+                           NetworkHealthCheck networkHealthCheck,
+                           boolean voteOnReplicationFailure,
+                           int quorumSize) {
       this.checkForLiveServer = checkForLiveServer;
       this.clusterName = clusterName;
       this.groupName = groupName;
@@ -81,6 +94,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation>
{
       this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       this.replicaPolicy = replicaPolicy;
       this.networkHealthCheck = networkHealthCheck;
+      this.voteOnReplicationFailure = voteOnReplicationFailure;
+      this.quorumSize = quorumSize;
    }
 
    public boolean isCheckForLiveServer() {
@@ -123,6 +138,8 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation>
{
    public ReplicaPolicy getReplicaPolicy() {
       if (replicaPolicy == null) {
          replicaPolicy = new ReplicaPolicy(networkHealthCheck, this);
+         replicaPolicy.setQuorumSize(quorumSize);
+         replicaPolicy.setVoteOnReplicationFailure(voteOnReplicationFailure);
          if (clusterName != null && clusterName.length() > 0) {
             replicaPolicy.setClusterName(clusterName);
          }
@@ -182,6 +199,10 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation>
{
       this.allowAutoFailBack = allowAutoFailBack;
    }
 
+   public boolean isVoteOnReplicationFailure() {
+      return voteOnReplicationFailure;
+   }
+
    @Override
    public LiveActivation createActivation(ActiveMQServerImpl server,
                                           boolean wasLive,
@@ -189,4 +210,12 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation>
{
                                           ActiveMQServerImpl.ShutdownOnCriticalErrorListener
shutdownOnCriticalIO) {
       return new SharedNothingLiveActivation(server, this);
    }
+
+   public int getQuorumSize() {
+      return quorumSize;
+   }
+
+   public void setQuorumSize(int quorumSize) {
+      this.quorumSize = quorumSize;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
index b3e9c32..000552a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
@@ -45,6 +45,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
 
    private final StorageManager storageManager;
    private final ScheduledExecutorService scheduledPool;
+   private final int quorumSize;
 
    private CountDownLatch latch;
 
@@ -66,9 +67,11 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
    public SharedNothingBackupQuorum(StorageManager storageManager,
                                     NodeManager nodeManager,
                                     ScheduledExecutorService scheduledPool,
-                                    NetworkHealthCheck networkHealthCheck) {
+                                    NetworkHealthCheck networkHealthCheck,
+                                    int quorumSize) {
       this.storageManager = storageManager;
       this.scheduledPool = scheduledPool;
+      this.quorumSize = quorumSize;
       this.latch = new CountDownLatch(1);
       this.nodeManager = nodeManager;
       this.networkHealthCheck = networkHealthCheck;
@@ -257,8 +260,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
     * @return the voting decision
     */
    private boolean isLiveDown() {
-      // we use 1 less than the max cluste size as we arent bothered about the replicated
live node
-      int size = quorumManager.getMaxClusterSize();
+      int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;
 
       QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, storageManager);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index cb8c971..1f47f91 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -121,7 +121,7 @@ public final class SharedNothingBackupActivation extends Activation {
          synchronized (this) {
             if (closed)
                return;
-            backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(),
activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck);
+            backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(),
activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
             activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index c984ae2..ce05638 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -52,6 +52,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
+import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
+import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.jboss.logging.Logger;
 
@@ -215,7 +217,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
 
       @Override
       public void connectionFailed(ActiveMQException exception, boolean failedOver) {
-         connectionClosed();
+         handleClose(true);
       }
 
       @Override
@@ -225,6 +227,10 @@ public class SharedNothingLiveActivation extends LiveActivation {
 
       @Override
       public void connectionClosed() {
+         handleClose(false);
+      }
+
+      private void handleClose(boolean failed) {
          ExecutorService executorService = activeMQServer.getThreadPool();
          if (executorService != null) {
             executorService.execute(new Runnable() {
@@ -234,6 +240,45 @@ public class SharedNothingLiveActivation extends LiveActivation {
                      if (replicationManager != null) {
                         activeMQServer.getStorageManager().stopReplication();
                         replicationManager = null;
+
+                        if (failed && replicatedPolicy.isVoteOnReplicationFailure())
{
+                           QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
+                           int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize()
: replicatedPolicy.getQuorumSize();
+
+                           QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size,
activeMQServer.getStorageManager());
+
+                           quorumManager.vote(quorumVote);
+
+                           try {
+                              quorumVote.await(5, TimeUnit.SECONDS);
+                           } catch (InterruptedException interruption) {
+                              // No-op. The best the quorum can do now is to return the latest
number it has
+                           }
+
+                           quorumManager.voteComplete(quorumVote);
+
+                           if (!quorumVote.getDecision()) {
+                              try {
+                                 Thread startThread = new Thread(new Runnable() {
+                                    @Override
+                                    public void run() {
+                                       try {
+                                          if (logger.isTraceEnabled()) {
+                                             logger.trace("Calling activeMQServer.stop()
to stop the server");
+                                          }
+                                          activeMQServer.stop();
+                                       } catch (Exception e) {
+                                          ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e,
activeMQServer);
+                                       }
+                                    }
+                                 });
+                                 startThread.start();
+                                 startThread.join();
+                              } catch (Exception e) {
+                                 e.printStackTrace();
+                              }
+                           }
+                        }
                      }
                   }
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index bc9363f..66d2b2b 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1983,6 +1983,20 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="vote-on-replication-failure" type="xsd:boolean" default="false"
minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Whether or not this live broker should vote to remain as live if replication
is lost.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="quorum-size" type="xsd:integer" default="-1" minOccurs="0"
maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The quorum size used for voting after replication loss, -1 means use the
current cluster size
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:all>
    </xsd:complexType>
    <xsd:complexType name="replicaPolicyType">
@@ -2055,6 +2069,22 @@
                </xsd:documentation>
             </xsd:annotation>
          </xsd:element>
+         <xsd:element name="vote-on-replication-failure" type="xsd:boolean" default="false"
minOccurs="0" maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  If we have to start as a replicated server decide whether or not this live
broker should vote to remain
+                  as live if replication is lost.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+         <xsd:element name="quorum-size" type="xsd:integer" default="-1" minOccurs="0"
maxOccurs="1">
+            <xsd:annotation>
+               <xsd:documentation>
+                  If we have to start as a replicated server or we are a backup and lose
connection to live, the quorum size
+                  used for voting after replication loss, -1 means use the current cluster
size
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:all>
    </xsd:complexType>
    <xsd:complexType name="colocatedReplicaPolicyType">

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/docs/user-manual/en/network-isolation.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/network-isolation.md b/docs/user-manual/en/network-isolation.md
index 38b0d36..1ab5e62 100644
--- a/docs/user-manual/en/network-isolation.md
+++ b/docs/user-manual/en/network-isolation.md
@@ -1,8 +1,61 @@
-# Network Isolation
+# Network Isolation (Split Brain)
 
-In case the server is isolated, say for a network failure, the server will be isolated for
its peers on a network of brokers. If you are playing with replication the backup may think
the backup failed and you may endup with two live nodes, what is called the split brain.
+It is possible that if a replicated live or backup server becomes isolated in a network that
failover will occur and you will end up
+with 2 live servers serving messages in a cluster, this we call split brain. There are different
configurations you can choose
+from that will help mitigate this problem
 
-# Pinging the network
+## Quorum Voting
+
+Quorum voting is used by both the live and the backup to decide what to do if a replication
connection is disconnected. 
+Basically the server will request each live server in the cluster to vote as to whether it
thinks the server it is replicating 
+to or from is still alive. This being the case the minimum number of live/backup pairs needed
is 3. If less than 3 pairs 
+are used then the only option is to use a Network Pinger which is explained later in this
chapter or choose how you want each server to 
+react which the following details:
+ 
+### Backup Voting
+
+By default if a replica loses its replication connection to the live broker it makes a decision
as to whether to start or not
+with a quorum vote. This of course requires that there be at least 3 pairs of live/backup
nodes in the cluster. For a 3 node 
+cluster it will start if it gets 2 votes back saying that its live server is no longer available,
for 4 nodes this would be 
+3 votes and so on.
+
+It's also possible to statically set the quorum size that should be used fotr the case where
the cluster size is known up front,
+this is done on the Replica Policy like so:
+
+```xml
+<ha-policy>
+  <replication>
+    <slave>
+       <quorum-size>2</quorum-size>
+    </slave>
+  </replication>
+</ha-policy>
+```
+
+In this example the quorum size is set to 2 so if you were using a single pair and the backup
lost connectivity it would 
+never start.
+
+### Live Voting
+
+By default, if the live server loses its replication connection then it will just carry on
and wait for a backup to reconnect 
+and start replicating again. In the event of a possible split brain scenario this may mean
that the live stays live even though
+the backup has been activated. It is possible to configure the live server to vote for a
quorum if this happens, in this way
+if the live server doesn't not receive a majority vote then it will shutdown. This is done
by setting the _vote-on-replication-failure_ 
+to true.
+
+```xml
+<ha-policy>
+  <replication>
+    <master>
+       <vote-on-replication-failure>true</vote-on-replication-failure>
+       <quorum-size>2</quorum-size>
+    </master>
+  </replication>
+</ha-policy>
+```
+As in the backup policy it is also possible to statically configure the quorum size.
+
+## Pinging the network
 
 You may configure one more addresses on the broker.xml that are part of your network topology,
that will be pinged through the life cycle of the server.
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/43a92764/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java
new file mode 100644
index 0000000..bf52835
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedVotingFailoverTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+public class ReplicatedVotingFailoverTest extends FailoverTestBase {
+
+   boolean testBackupFailsVoteFails = false;
+   @Rule
+   public TestRule watcher = new TestWatcher() {
+      @Override
+      protected void starting(Description description) {
+         testBackupFailsVoteFails = description.getMethodName().equals("testBackupFailsVoteFails");
+      }
+
+   };
+
+   protected void beforeWaitForRemoteBackupSynchronization() {
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+   @Test
+   public void testBackupFailsVoteSuccess() throws Exception {
+      try {
+         beforeWaitForRemoteBackupSynchronization();
+
+         waitForRemoteBackupSynchronization(backupServer.getServer());
+
+         backupServer.stop();
+
+         ServerLocator locator = createInVMLocator(0);
+         ClientSessionFactory sessionFactory = locator.createSessionFactory();
+         ClientSession session = sessionFactory.createSession();
+         addClientSession(session);
+         ClientProducer producer = session.createProducer("testAddress");
+         producer.send(session.createMessage(true));
+         assertTrue(liveServer.isActive());
+
+
+      } finally {
+         try {
+            liveServer.getServer().stop();
+         } catch (Throwable ignored) {
+         }
+         try {
+            backupServer.getServer().stop();
+         } catch (Throwable ignored) {
+         }
+      }
+   }
+
+   @Test
+   public void testBackupFailsVoteFails() throws Exception {
+      try {
+         beforeWaitForRemoteBackupSynchronization();
+
+         waitForRemoteBackupSynchronization(backupServer.getServer());
+
+         backupServer.stop();
+
+         try {
+            ServerLocator locator = createInVMLocator(0);
+            ClientSessionFactory sessionFactory = locator.createSessionFactory();
+            ClientSession session = sessionFactory.createSession();
+            addClientSession(session);
+            ClientProducer producer = session.createProducer("testAddress");
+            producer.send(session.createMessage(true));
+         } catch (Exception e) {
+            //expected
+         }
+         waitForServerToStop(liveServer.getServer());
+         assertFalse(liveServer.isStarted());
+
+
+      } finally {
+         try {
+            liveServer.getServer().stop();
+         } catch (Throwable ignored) {
+         }
+         try {
+            backupServer.getServer().stop();
+         } catch (Throwable ignored) {
+         }
+      }
+   }
+
+   @Override
+   protected void createConfigs() throws Exception {
+      createReplicatedConfigs();
+   }
+
+   @Override
+   protected void setupHAPolicyConfiguration() {
+      ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true);
+      ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setVoteOnReplicationFailure(true);
+      ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true);
+      ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
+      if (testBackupFailsVoteFails) {
+         ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setQuorumSize(2);
+      }
+   }
+
+   @Override
+   protected void crash(boolean waitFailure, ClientSession... sessions) throws Exception
{
+      if (sessions.length > 0) {
+         for (ClientSession session : sessions) {
+            waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
+         }
+      } else {
+         waitForRemoteBackup(null, 5, true, backupServer.getServer());
+      }
+      super.crash(waitFailure, sessions);
+   }
+
+   @Override
+   protected void crash(ClientSession... sessions) throws Exception {
+      if (sessions.length > 0) {
+         for (ClientSession session : sessions) {
+            waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
+         }
+      } else {
+         waitForRemoteBackup(null, 5, true, backupServer.getServer());
+      }
+      super.crash(sessions);
+   }
+}


Mime
View raw message