asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject asterixdb git commit: [ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in Dynamic Topology
Date Wed, 27 Sep 2017 04:14:56 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master a181e1353 -> 2c13bdcc8


[ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in Dynamic Topology

- user model changes: no
- storage format changes: no
- interface changes: yes
  Renamed IClusterStateManager add/Remove NCConfig methods
  to notifyNode join/failure.

Details:
- Mark node as participant when it completes its startup and
  not when it joins the cluster.
- Allow partitions to be added with pending activation state.
- Remove the use of static MetadataProperties for reporting number of nodes.
- Add test cases.

Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2029
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2c13bdcc
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2c13bdcc
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2c13bdcc

Branch: refs/heads/master
Commit: 2c13bdcc8eb5f2bc228aeafe8e4c4ce1481c68a0
Parents: a181e13
Author: Murtadha Hubail <mhubail@apache.org>
Authored: Tue Sep 26 23:22:06 2017 +0300
Committer: Murtadha Hubail <mhubail@apache.org>
Committed: Tue Sep 26 21:13:56 2017 -0700

----------------------------------------------------------------------
 .../replication/AutoFaultToleranceStrategy.java |   4 +-
 .../bootstrap/ClusterLifecycleListener.java     |   4 +-
 .../apache/asterix/util/FaultToleranceUtil.java |   8 +-
 .../runtime/ClusterStateManagerTest.java        | 239 +++++++++++++++++++
 .../common/cluster/ClusterPartition.java        |  19 +-
 .../common/cluster/IClusterStateManager.java    |  10 +-
 .../metadata_node_recovery.cluster_state.4.adm  |   4 +
 .../metadata_node_recovery.cluster_state.7.adm  |   4 +
 .../node_failback.cluster_state.10.adm          |   4 +
 .../node_failback.cluster_state.5.adm           |   4 +
 .../runtime/utils/ClusterStateManager.java      |  64 ++---
 11 files changed, 320 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index db26c3a..80fdbd6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -186,8 +186,8 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy
{
 
     private boolean addActiveReplica(String replica, ClusterPartition partition,
             Map<String, List<Integer>> partitionRecoveryPlan) {
-        Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
-        if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica))
{
+        final Set<String> participantNodes = clusterManager.getParticipantNodes();
+        if (participantNodes.contains(replica) && !failedNodes.contains(replica))
{
             if (!partitionRecoveryPlan.containsKey(replica)) {
                 List<Integer> replicaPartitions = new ArrayList<>();
                 replicaPartitions.add(partition.getPartitionId());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 0583508..84f841c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -71,7 +71,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener
{
             LOGGER.info("NC: " + nodeId + " joined");
         }
         IClusterStateManager csm = appCtx.getClusterStateManager();
-        csm.addNCConfiguration(nodeId, ncConfiguration);
+        csm.notifyNodeJoin(nodeId, ncConfiguration);
 
         //if metadata node rejoining, we need to rebind the proxy connection when it is active
again.
         if (!csm.isMetadataNodeActive()) {
@@ -101,7 +101,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener
{
                 LOGGER.info("NC: " + deadNode + " left");
             }
             IClusterStateManager csm = appCtx.getClusterStateManager();
-            csm.removeNCConfiguration(deadNode);
+            csm.notifyNodeFailure(deadNode);
 
             //if metadata node failed, we need to rebind the proxy connection when it is
active again
             if (!csm.isMetadataNodeActive()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
index 241cd65..9887c57 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
@@ -20,6 +20,7 @@ package org.apache.asterix.util;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
@@ -48,15 +49,16 @@ public class FaultToleranceUtil {
         List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
                 .map(Replica::getId).collect(Collectors.toList());
         String nodeIdAddress = StringUtils.EMPTY;
-        Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
+        Map<String, Map<IOption, Object>> ncConfiguration = clusterManager.getNcConfiguration();
         // In case the node joined with a new IP address, we need to send it to the other
replicas
         if (event == ClusterEventType.NODE_JOIN) {
-            nodeIdAddress = (String)activeNcConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
+            nodeIdAddress = (String) ncConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
         }
+        final Set<String> participantNodes = clusterManager.getParticipantNodes();
         ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
         for (String replica : primaryRemoteReplicas) {
             // If the remote replica is alive, send the event
-            if (activeNcConfiguration.containsKey(replica)) {
+            if (participantNodes.contains(replica)) {
                 try {
                     messageBroker.sendApplicationMessageToNC(msg, replica);
                 } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
new file mode 100644
index 0000000..6c11139
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.app.replication.NoFaultToleranceStrategy;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.runtime.transaction.ResourceIdManager;
+import org.apache.asterix.runtime.utils.CcApplicationContext;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
+import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ClusterStateManagerTest {
+
+    private static final String NC1 = "NC1";
+    private static final String NC2 = "NC2";
+    private static final String NC3 = "NC3";
+    private static final String METADATA_NODE = NC1;
+
+    /**
+     * Ensures that a cluster with a fixed topology will not be active until
+     * all partitions are active.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void fixedTopologyState() throws Exception {
+        ClusterStateManager csm = new ClusterStateManager();
+        CcApplicationContext ccAppCtx = ccAppContext(csm);
+        // prepare fixed topology
+        ccAppCtx.getMetadataProperties().getClusterPartitions().put(0, new ClusterPartition(0,
NC1, 0));
+        ccAppCtx.getMetadataProperties().getClusterPartitions().put(1, new ClusterPartition(1,
NC2, 0));
+        ccAppCtx.getMetadataProperties().getClusterPartitions().put(2, new ClusterPartition(2,
NC3, 0));
+        for (ClusterPartition cp : ccAppCtx.getMetadataProperties().getClusterPartitions().values())
{
+            ccAppCtx.getMetadataProperties().getNodePartitions().put(cp.getNodeId(), new
ClusterPartition[] { cp });
+        }
+        csm.setCcAppCtx(ccAppCtx);
+
+        // notify NC1 joined and completed startup
+        notifyNodeJoined(csm, NC1, 0, false);
+        notifyNodeStartupCompletion(ccAppCtx, NC1);
+        // cluster should be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+        // notify NC2 joined
+        notifyNodeJoined(csm, NC2, 1, false);
+        // notify NC3 joined
+        notifyNodeJoined(csm, NC3, 2, false);
+        // notify NC2 completed startup
+        notifyNodeStartupCompletion(ccAppCtx, NC2);
+        // cluster should still be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+        // notify NC3 completed startup
+        notifyNodeStartupCompletion(ccAppCtx, NC3);
+        // cluster should now be active
+        Assert.assertTrue(csm.isClusterActive());
+        // NC2 failed
+        csm.notifyNodeFailure(NC2);
+        // cluster should now be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+    }
+
+    /**
+     * Ensures that a cluster with a dynamic topology will not go into unusable state while
+     * new partitions are dynamically added.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void dynamicTopologyState() throws Exception {
+        ClusterStateManager csm = new ClusterStateManager();
+        CcApplicationContext ccApplicationContext = ccAppContext(csm);
+        csm.setCcAppCtx(ccApplicationContext);
+
+        // notify NC1 joined and completed startup
+        notifyNodeJoined(csm, NC1, 0, true);
+        notifyNodeStartupCompletion(ccApplicationContext, NC1);
+        // cluster should now be active
+        Assert.assertTrue(csm.isClusterActive());
+        // notify NC2 joined
+        notifyNodeJoined(csm, NC2, 1, true);
+        // notify NC3 joined
+        notifyNodeJoined(csm, NC3, 2, true);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        //  notify NC2 completed startup
+        notifyNodeStartupCompletion(ccApplicationContext, NC2);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        //  notify NC3 completed startup
+        notifyNodeStartupCompletion(ccApplicationContext, NC3);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        // NC2 failed
+        csm.notifyNodeFailure(NC2);
+        // cluster should now be unusable
+        Assert.assertTrue(!csm.isClusterActive());
+    }
+
+    /**
+     * Ensures that a cluster with a dynamic topology will not go into unusable state if
+     * a newly added node fails before completing its startup
+     *
+     * @throws Exception
+     */
+    @Test
+    public void dynamicTopologyNodeFailure() throws Exception {
+        ClusterStateManager csm = new ClusterStateManager();
+        CcApplicationContext ccApplicationContext = ccAppContext(csm);
+        csm.setCcAppCtx(ccApplicationContext);
+
+        // notify NC1 joined and completed startup
+        notifyNodeJoined(csm, NC1, 0, true);
+        notifyNodeStartupCompletion(ccApplicationContext, NC1);
+        // cluster should now be active
+        Assert.assertTrue(csm.isClusterActive());
+        // notify NC2 joined
+        notifyNodeJoined(csm, NC2, 1, true);
+        // notify NC3 joined
+        notifyNodeJoined(csm, NC3, 2, true);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        //  notify NC2 completed startup
+        notifyNodeStartupCompletion(ccApplicationContext, NC2);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+        // NC3 failed before completing startup
+        csm.notifyNodeFailure(NC3);
+        // cluster should still be active
+        Assert.assertTrue(csm.isClusterActive());
+    }
+
+    /**
+     * Ensures that a cluster with a dynamic topology will be in an unusable state
+     * if all partitions are pending activation
+     *
+     * @throws Exception
+     */
+    @Test
+    public void dynamicTopologyNoActivePartitions() throws Exception {
+        ClusterStateManager csm = new ClusterStateManager();
+        CcApplicationContext ccApplicationContext = ccAppContext(csm);
+        csm.setCcAppCtx(ccApplicationContext);
+
+        // notify NC1 joined
+        notifyNodeJoined(csm, NC1, 0, true);
+        // notify NC1 failed before completing startup
+        csm.notifyNodeFailure(NC1);
+        Assert.assertTrue(csm.getState() == ClusterState.UNUSABLE);
+    }
+
+    private void notifyNodeJoined(ClusterStateManager csm, String nodeId, int partitionId,
boolean registerPartitions)
+            throws HyracksException, AsterixException {
+        csm.notifyNodeJoin(nodeId, Collections.emptyMap());
+        if (registerPartitions) {
+            csm.registerNodePartitions(nodeId, new ClusterPartition[] { new ClusterPartition(partitionId,
nodeId, 0) });
+        }
+    }
+
+    private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String
nodeId)
+            throws HyracksDataException {
+        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true);
+        applicationContext.getResourceIdManager().report(nodeId, 0);
+        applicationContext.getFaultToleranceStrategy().process(msg);
+    }
+
+    private CcApplicationContext ccAppContext(ClusterStateManager csm) throws HyracksDataException
{
+        CcApplicationContext ccApplicationContext = Mockito.mock(CcApplicationContext.class);
+        ConfigManager configManager = new ConfigManager(null);
+        IApplicationConfig applicationConfig = new ConfigManagerApplicationConfig(configManager);
+        ICCServiceContext iccServiceContext = Mockito.mock(CCServiceContext.class);
+        Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
+        Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
+
+        NoFaultToleranceStrategy fts = new NoFaultToleranceStrategy();
+        fts.bindTo(csm);
+        Mockito.when(ccApplicationContext.getFaultToleranceStrategy()).thenReturn(fts);
+
+        MetadataProperties metadataProperties = mockMetadataProperties();
+        Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties);
+
+        ResourceIdManager resourceIdManager = new ResourceIdManager(csm);
+        Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager);
+
+        IMetadataBootstrap metadataBootstrap = Mockito.mock(IMetadataBootstrap.class);
+        Mockito.doNothing().when(metadataBootstrap).init();
+        Mockito.when(ccApplicationContext.getMetadataBootstrap()).thenReturn(metadataBootstrap);
+
+        IGlobalRecoveryManager globalRecoveryManager = Mockito.mock(IGlobalRecoveryManager.class);
+        Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true);
+        Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager);
+        return ccApplicationContext;
+    }
+
+    private MetadataProperties mockMetadataProperties() {
+        SortedMap<Integer, ClusterPartition> clusterPartitions = Collections.synchronizedSortedMap(new
TreeMap<>());
+        Map<String, ClusterPartition[]> nodePartitionsMap = new ConcurrentHashMap<>();
+        MetadataProperties metadataProperties = Mockito.mock(MetadataProperties.class);
+        Mockito.when(metadataProperties.getMetadataNodeName()).thenReturn(METADATA_NODE);
+        Mockito.when(metadataProperties.getClusterPartitions()).thenReturn(clusterPartitions);
+        Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap);
+        return metadataProperties;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc27fbb..cc99421 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -24,6 +24,8 @@ public class ClusterPartition implements Cloneable {
     private final int ioDeviceNum;
     private String activeNodeId = null;
     private boolean active = false;
+    /* a flag indicating if the partition was dynamically added to the cluster and pending
first time activation */
+    private boolean pendingActivation = false;
 
     public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
         this.partitionId = partitionId;
@@ -55,6 +57,18 @@ public class ClusterPartition implements Cloneable {
         this.active = active;
     }
 
+    public boolean isActive() {
+        return active;
+    }
+
+    public boolean isPendingActivation() {
+        return pendingActivation;
+    }
+
+    public void setPendingActivation(boolean pendingActivation) {
+        this.pendingActivation = pendingActivation;
+    }
+
     @Override
     public ClusterPartition clone() {
         return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
@@ -67,10 +81,7 @@ public class ClusterPartition implements Cloneable {
         sb.append(", Original Node: " + nodeId);
         sb.append(", IODevice: " + ioDeviceNum);
         sb.append(", Active Node: " + activeNodeId);
+        sb.append(", Pending Activation: " + pendingActivation);
         return sb.toString();
     }
-
-    public boolean isActive() {
-        return active;
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index b368c3b..3948ea6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -76,7 +76,7 @@ public interface IClusterStateManager {
     /**
      * @return a map of nodeId and NC Configuration for active nodes.
      */
-    Map<String, Map<IOption, Object>> getActiveNcConfiguration();
+    Map<String, Map<IOption, Object>> getNcConfiguration();
 
     /**
      * @return The current metadata node Id.
@@ -187,13 +187,13 @@ public interface IClusterStateManager {
     int getNumberOfNodes();
 
     /**
-     * Add node configuration
+     * Notifies {@link IClusterStateManager} that a node has joined
      *
      * @param nodeId
      * @param ncConfiguration
      * @throws HyracksException
      */
-    void addNCConfiguration(String nodeId, Map<IOption, Object> ncConfiguration) throws
HyracksException;
+    void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws
HyracksException;
 
     /**
      * @return true if metadata node is active, false otherwise
@@ -201,12 +201,12 @@ public interface IClusterStateManager {
     boolean isMetadataNodeActive();
 
     /**
-     * Remove configuration of a dead node
+     * Notifies {@link IClusterStateManager} that a node has failed
      *
      * @param deadNode
      * @throws HyracksException
      */
-    void removeNCConfiguration(String deadNode) throws HyracksException;
+    void notifyNodeFailure(String deadNode) throws HyracksException;
 
     /**
      * @return a substitution node or null

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
index d6ea4b7..06e4d80 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
@@ -6,6 +6,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc1",
       "active" : false,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "1" : {
@@ -13,6 +14,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc1",
       "active" : false,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     },
     "2" : {
@@ -20,6 +22,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "3" : {
@@ -27,6 +30,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     }
   },

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
index 579caac..e0ec010 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
@@ -6,6 +6,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc1",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "1" : {
@@ -13,6 +14,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc1",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     },
     "2" : {
@@ -20,6 +22,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "3" : {
@@ -27,6 +30,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     }
   },

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
index 579caac..e0ec010 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
@@ -6,6 +6,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc1",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "1" : {
@@ -13,6 +14,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc1",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     },
     "2" : {
@@ -20,6 +22,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "3" : {
@@ -27,6 +30,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     }
   },

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
index 5f58ff7..2de5a55 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
@@ -6,6 +6,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "1" : {
@@ -13,6 +14,7 @@
       "nodeId" : "asterix_nc1",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     },
     "2" : {
@@ -20,6 +22,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 0
     },
     "3" : {
@@ -27,6 +30,7 @@
       "nodeId" : "asterix_nc2",
       "activeNodeId" : "asterix_nc2",
       "active" : true,
+      "pendingActivation" : false,
       "iodeviceNum" : 1
     }
   },

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c13bdcc/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 51c87b4..6e55fd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -68,7 +68,7 @@ public class ClusterStateManager implements IClusterStateManager {
      */
 
     private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName());
-    private final Map<String, Map<IOption, Object>> activeNcConfiguration = new
HashMap<>();
+    private final Map<String, Map<IOption, Object>> ncConfigMap = new HashMap<>();
     private Set<String> pendingRemoval = new HashSet<>();
     private final Cluster cluster;
     private ClusterState state = ClusterState.UNUSABLE;
@@ -78,6 +78,7 @@ public class ClusterStateManager implements IClusterStateManager {
     private String currentMetadataNode = null;
     private boolean metadataNodeActive = false;
     private Set<String> failedNodes = new HashSet<>();
+    private Set<String> participantNodes = new HashSet<>();
     private IFaultToleranceStrategy ftStrategy;
     private ICcApplicationContext appCtx;
 
@@ -96,25 +97,25 @@ public class ClusterStateManager implements IClusterStateManager {
     }
 
     @Override
-    public synchronized void removeNCConfiguration(String nodeId) throws HyracksException
{
+    public synchronized void notifyNodeFailure(String nodeId) throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Removing configuration parameters for node id " + nodeId);
         }
         failedNodes.add(nodeId);
-        ftStrategy.notifyNodeFailure(nodeId);
+        ncConfigMap.remove(nodeId);
         pendingRemoval.remove(nodeId);
+        ftStrategy.notifyNodeFailure(nodeId);
     }
 
     @Override
-    public synchronized void addNCConfiguration(String nodeId, Map<IOption, Object>
configuration)
-            throws HyracksException {
+    public synchronized void notifyNodeJoin(String nodeId, Map<IOption, Object> configuration)
throws HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering configuration parameters for node id " + nodeId);
         }
-        activeNcConfiguration.put(nodeId, configuration);
         failedNodes.remove(nodeId);
-        ftStrategy.notifyNodeJoin(nodeId);
+        ncConfigMap.put(nodeId, configuration);
         updateNodeConfig(nodeId, configuration);
+        ftStrategy.notifyNodeJoin(nodeId);
     }
 
     @Override
@@ -142,6 +143,11 @@ public class ClusterStateManager implements IClusterStateManager {
 
     @Override
     public synchronized void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException
{
+        if (active) {
+            participantNodes.add(nodeId);
+        } else {
+            participantNodes.remove(nodeId);
+        }
         ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
         // if this isn't a storage node, it will not have cluster partitions
         if (nodePartitions != null) {
@@ -159,6 +165,7 @@ public class ClusterStateManager implements IClusterStateManager {
             clusterPartition.setActive(active);
             if (active) {
                 clusterPartition.setActiveNodeId(activeNode);
+                clusterPartition.setPendingActivation(false);
             }
         }
     }
@@ -170,19 +177,22 @@ public class ClusterStateManager implements IClusterStateManager {
             return;
         }
         resetClusterPartitionConstraint();
-        if (clusterPartitions.isEmpty()) {
+        // if the cluster has no registered partitions or all partitions are pending activation
-> UNUSABLE
+        if (clusterPartitions.isEmpty() || clusterPartitions.values().stream()
+                .allMatch(ClusterPartition::isPendingActivation)) {
             LOGGER.info("Cluster does not have any registered partitions");
             setState(ClusterState.UNUSABLE);
             return;
         }
-        for (ClusterPartition p : clusterPartitions.values()) {
-            if (!p.isActive()) {
-                setState(ClusterState.UNUSABLE);
-                return;
-            }
+
+        // exclude partitions that are pending activation
+        if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() &&
!p.isPendingActivation())) {
+            setState(ClusterState.UNUSABLE);
+            return;
         }
+
         IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
-        for (String node : activeNcConfiguration.keySet()) {
+        for (String node : participantNodes) {
             if (!resourceIdManager.reported(node)) {
                 LOGGER.log(Level.INFO, "Partitions are ready but %s has not yet registered
its max resource id...",
                         node);
@@ -234,7 +244,7 @@ public class ClusterStateManager implements IClusterStateManager {
 
     @Override
     public synchronized String[] getIODevices(String nodeId) {
-        Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId);
+        Map<IOption, Object> ncConfig = ncConfigMap.get(nodeId);
         if (ncConfig == null) {
             if (LOGGER.isLoggable(Level.WARNING)) {
                 LOGGER.warning("Configuration parameters for nodeId " + nodeId
@@ -258,20 +268,16 @@ public class ClusterStateManager implements IClusterStateManager {
 
     @Override
     public synchronized Set<String> getParticipantNodes() {
-        Set<String> participantNodes = new HashSet<>();
-        for (String pNode : activeNcConfiguration.keySet()) {
-            participantNodes.add(pNode);
-        }
-        return participantNodes;
+        return new HashSet<>(participantNodes);
     }
 
     @Override
     public synchronized Set<String> getParticipantNodes(boolean excludePendingRemoval)
{
-        Set<String> participantNodes = getParticipantNodes();
+        final Set<String> participantNodesCopy = getParticipantNodes();
         if (excludePendingRemoval) {
-            participantNodes.removeAll(pendingRemoval);
+            participantNodesCopy.removeAll(pendingRemoval);
         }
-        return participantNodes;
+        return participantNodesCopy;
     }
 
     @Override
@@ -303,8 +309,8 @@ public class ClusterStateManager implements IClusterStateManager {
     }
 
     @Override
-    public int getNumberOfNodes() {
-        return appCtx.getMetadataProperties().getNodeNames().size();
+    public synchronized int getNumberOfNodes() {
+        return participantNodes.size();
     }
 
     @Override
@@ -379,8 +385,8 @@ public class ClusterStateManager implements IClusterStateManager {
     }
 
     @Override
-    public Map<String, Map<IOption, Object>> getActiveNcConfiguration() {
-        return Collections.unmodifiableMap(activeNcConfiguration);
+    public Map<String, Map<IOption, Object>> getNcConfiguration() {
+        return Collections.unmodifiableMap(ncConfigMap);
     }
 
     @Override
@@ -402,6 +408,7 @@ public class ClusterStateManager implements IClusterStateManager {
             }
         }
         for (ClusterPartition nodePartition : nodePartitions) {
+            nodePartition.setPendingActivation(true);
             clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
         }
         node2PartitionsMap.put(nodeId, nodePartitions);
@@ -419,6 +426,7 @@ public class ClusterStateManager implements IClusterStateManager {
             for (ClusterPartition nodePartition : nodePartitions) {
                 clusterPartitions.remove(nodePartition.getPartitionId());
             }
+            participantNodes.remove(nodeId);
         }
     }
 
@@ -427,7 +435,7 @@ public class ClusterStateManager implements IClusterStateManager {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering intention to remove node id " + nodeId);
         }
-        if (activeNcConfiguration.containsKey(nodeId)) {
+        if (participantNodes.contains(nodeId)) {
             pendingRemoval.add(nodeId);
         } else {
             LOGGER.warning("Cannot register unknown node " + nodeId + " for pending removal");


Mime
View raw message