asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mb...@apache.org
Subject asterixdb git commit: [NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes
Date Thu, 14 Sep 2017 18:54:08 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 4e91ceffb -> f52bc5882


[NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes

- Ensure thread factory is configured before using it
- Don't mark cluster state ACTIVE until after global recovery has
  completed
- Failure of global recovery causes CC to shutdown
- Don't mark cluster state ACTIVE until max resource id has been
  reported by all nodes

Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2004
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f52bc588
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f52bc588
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f52bc588

Branch: refs/heads/master
Commit: f52bc5882203de6a16f5eaf385f07e8b93bce25b
Parents: 4e91cef
Author: Michael Blow <michael.blow@couchbase.com>
Authored: Thu Sep 14 01:35:41 2017 -0400
Committer: Michael Blow <mblow@apache.org>
Committed: Thu Sep 14 11:53:49 2017 -0700

----------------------------------------------------------------------
 .../hyracks/bootstrap/CCApplication.java        | 19 +++++----
 .../bootstrap/GlobalRecoveryManager.java        | 42 ++++++++++++--------
 .../hyracks/bootstrap/NCApplication.java        | 20 ++++++----
 .../common/api/IClusterManagementWork.java      | 23 ++++++-----
 .../common/transactions/IResourceIdManager.java |  4 +-
 .../asterix/metadata/GarbageCollector.java      |  9 +++--
 .../message/ResourceIdRequestMessage.java       |  2 +-
 .../runtime/transaction/ResourceIdManager.java  |  4 +-
 .../runtime/utils/ClusterStateManager.java      | 40 +++++++++++++------
 .../hyracks/api/application/IApplication.java   |  9 +++--
 .../hyracks/control/cc/BaseCCApplication.java   |  7 +++-
 .../control/cc/ClusterControllerService.java    | 19 ++++-----
 .../hyracks/control/nc/BaseNCApplication.java   |  7 +++-
 .../control/nc/NodeControllerService.java       | 11 ++---
 .../nc/application/NCServiceContext.java        |  9 +----
 .../btree/helper/TestNCApplication.java         |  7 +++-
 16 files changed, 143 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 37e0c58..ef3800c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -108,12 +108,19 @@ public class CCApplication extends BaseCCApplication {
     private IHyracksClientConnection hcc;
 
     @Override
-    public void start(IServiceContext serviceCtx, String[] args) throws Exception {
+    public void init(IServiceContext serviceCtx) throws Exception {
+        ccServiceCtx = (ICCServiceContext) serviceCtx;
+        ccServiceCtx.setThreadFactory(
+                new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
+    }
+
+    @Override
+    public void start(String[] args) throws Exception {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
         }
-        final ClusterControllerService controllerService = (ClusterControllerService) serviceCtx.getControllerService();
-        this.ccServiceCtx = (ICCServiceContext) serviceCtx;
+        final ClusterControllerService controllerService = (ClusterControllerService) ccServiceCtx
+                .getControllerService();
         ccServiceCtx.setMessageBroker(new CCMessageBroker(controllerService));
 
         configureLoggingLevel(ccServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL));
@@ -122,8 +129,6 @@ public class CCApplication extends BaseCCApplication {
             LOGGER.info("Starting Asterix cluster controller");
         }
 
-        ccServiceCtx.setThreadFactory(
-                new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager()));
         String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
         int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
         hcc = new HyracksConnection(strIP, port);
@@ -207,8 +212,8 @@ public class CCApplication extends BaseCCApplication {
     }
 
     protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws
Exception {
-        HttpServer jsonAPIServer =
-                new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort());
+        HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
+                externalProperties.getAPIServerPort());
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3209557..13f3afa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -92,11 +92,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
         MetadataTransactionContext mdTxnCtx = null;
         try {
             MetadataManager.INSTANCE.init();
-            // Loop over datasets
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx))
{
-                mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
-            }
+            mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             // This needs to be fixed <-- Needs to shutdown the system -->
@@ -109,8 +106,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
                 try {
                     MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
                 } catch (Exception e1) {
-                    LOGGER.log(Level.SEVERE, "Exception in aborting", e1);
-                    e1.addSuppressed(e);
+                    LOGGER.log(Level.SEVERE, "Exception aborting metadata transaction", e1);
+                    e.addSuppressed(e1);
                     throw new IllegalStateException(e);
                 }
             }
@@ -118,11 +115,21 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager
{
         }
         recoveryCompleted = true;
         LOGGER.info("Global Recovery Completed");
+        appCtx.getClusterStateManager().refreshState();
+    }
+
+    protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext
mdTxnCtx)
+            throws Exception {
+        // Loop over datasets
+        for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
+            mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
+        }
+        return mdTxnCtx;
     }
 
     @Override
     public void notifyStateChange(ClusterState newState) {
-        if (newState != ClusterState.ACTIVE) {
+        if (newState != ClusterState.ACTIVE && newState != ClusterState.RECOVERING)
{
             recoveryCompleted = false;
         }
     }
@@ -132,8 +139,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
         if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME))
{
             MetadataProvider metadataProvider = new MetadataProvider(appCtx, dataverse);
             try {
-                List<Dataset> datasets =
-                        MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName());
+                List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                        dataverse.getDataverseName());
                 for (Dataset dataset : datasets) {
                     if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
                         // External dataset
@@ -145,8 +152,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
                         TransactionState datasetState = dsd.getState();
                         if (!indexes.isEmpty()) {
                             if (datasetState == TransactionState.BEGIN) {
-                                List<ExternalFile> files =
-                                        MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
dataset);
+                                List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+                                        dataset);
                                 // if persumed abort, roll backward
                                 // 1. delete all pending files
                                 for (ExternalFile file : files) {
@@ -157,8 +164,8 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
                             }
                             // 2. clean artifacts in NCs
                             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                            JobSpecification jobSpec =
-                                    ExternalIndexingOperations.buildAbortOp(dataset, indexes,
metadataProvider);
+                            JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(dataset,
indexes,
+                                    metadataProvider);
                             executeHyracksJob(jobSpec);
                             // 3. correct the dataset state
                             ((ExternalDatasetDetails) dataset.getDatasetDetails()).setState(TransactionState.COMMIT);
@@ -166,13 +173,13 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager
{
                             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                         } else if (datasetState == TransactionState.READY_TO_COMMIT) {
-                            List<ExternalFile> files =
-                                    MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
dataset);
+                            List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+                                    dataset);
                             // if ready to commit, roll forward
                             // 1. commit indexes in NCs
                             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                            JobSpecification jobSpec =
-                                    ExternalIndexingOperations.buildRecoverOp(dataset, indexes,
metadataProvider);
+                            JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(dataset,
indexes,
+                                    metadataProvider);
                             executeHyracksJob(jobSpec);
                             // 2. add pending files in metadata
                             for (ExternalFile file : files) {
@@ -221,4 +228,5 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
     public boolean isRecoveryCompleted() {
         return recoveryCompleted;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0f6b396..e8f63b4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -80,13 +80,17 @@ public class NCApplication extends BaseNCApplication {
     }
 
     @Override
-    public void start(IServiceContext serviceCtx, String[] args) throws Exception {
-        if (args.length > 0) {
-            throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
-        }
+    public void init(IServiceContext serviceCtx) throws Exception {
         this.ncServiceCtx = (INCServiceContext) serviceCtx;
         ncServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager()));
+    }
+
+    @Override
+    public void start(String[] args) throws Exception {
+        if (args.length > 0) {
+            throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+        }
         nodeId = this.ncServiceCtx.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -111,8 +115,8 @@ public class NCApplication extends BaseNCApplication {
         MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
         IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
         this.ncServiceCtx.setMessageBroker(messageBroker);
-        MessagingChannelInterfaceFactory interfaceFactory =
-                new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
+        MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
+                (NCMessageBroker) messageBroker, messagingProperties);
         this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
 
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -224,8 +228,8 @@ public class NCApplication extends BaseNCApplication {
         String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
                 .getStorageMountingPoints();
         for (String ioDevice : ioDevices) {
-            String tempDatasetsDir =
-                    ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+            String tempDatasetsDir = ioDevice + storageDirName + File.separator
+                    + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
             File tmpDsDir = new File(tempDatasetsDir);
             if (tmpDsDir.exists()) {
                 IoUtil.delete(tmpDsDir);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index bdbf4a5..e3424ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -20,23 +20,24 @@ package org.apache.asterix.common.api;
 
 public interface IClusterManagementWork {
 
-    public enum WorkType {
+    enum WorkType {
         ADD_NODE,
         REMOVE_NODE
     }
 
-    public enum ClusterState {
-        STARTING,
-        PENDING,
-        ACTIVE,
-        UNUSABLE,
-        REBALANCING,
-        SHUTTING_DOWN
+    enum ClusterState {
+        STARTING,       // the initial state
+        UNUSABLE,       // one or more cluster partitions are inactive or max id resources
have not been reported
+        PENDING,        // the metadata node has not yet joined & initialized
+        RECOVERING,     // global recovery has not yet completed
+        ACTIVE,         // cluster is ACTIVE and ready for requests
+        REBALANCING,    // replication is processing failbacks
+        SHUTTING_DOWN   // a shutdown request has been received, and is underway
     }
 
-    public WorkType getClusterManagementWorkType();
+    WorkType getClusterManagementWorkType();
 
-    public int getWorkId();
+    int getWorkId();
 
-    public IClusterEventsSubscriber getSourceSubscriber();
+    IClusterEventsSubscriber getSourceSubscriber();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index d36d383..ce49ccf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -18,12 +18,14 @@
  */
 package org.apache.asterix.common.transactions;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 public interface IResourceIdManager {
 
     long createResourceId();
 
     boolean reported(String nodeId);
 
-    void report(String nodeId, long maxResourceId);
+    void report(String nodeId, long maxResourceId) throws HyracksDataException;
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
index 8a3392a..a97e22a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.metadata;
 
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -29,7 +30,9 @@ import java.util.logging.Logger;
 public class GarbageCollector implements Runnable {
     private static final Logger LOGGER = Logger.getLogger(GarbageCollector.class.getName());
 
-    private static final long CLEANUP_PERIOD = 3600L * 24;
+    // TODO(mblow): make this configurable
+    private static final long CLEANUP_PERIOD = 1;
+    private static final TimeUnit CLEANUP_PERIOD_UNIT = TimeUnit.DAYS;
 
     static {
         // Starts the garbage collector thread which
@@ -40,13 +43,13 @@ public class GarbageCollector implements Runnable {
     }
 
     @Override
-    @SuppressWarnings("squid:S2142") // rethrow or interrupt thread on InterruptedException
+    @SuppressWarnings({"squid:S2142", "squid:S2189"}) // rethrow/interrupt thread on InterruptedException,
endless loop
     public void run() {
         LOGGER.info("Starting Metadata GC");
         while (true) {
             try {
                 synchronized (this) {
-                    this.wait(CLEANUP_PERIOD);
+                    CLEANUP_PERIOD_UNIT.timedWait(this, CLEANUP_PERIOD);
                 }
                 MetadataManager.INSTANCE.cleanupTempDatasets();
             } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 82a1177..decc1a9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -71,6 +71,6 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage {
 
     @Override
     public String toString() {
-        return ReportMaxResourceIdRequestMessage.class.getSimpleName();
+        return ResourceIdRequestMessage.class.getSimpleName();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 6a5ed08..afa626d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdManager implements IResourceIdManager {
 
@@ -59,13 +60,14 @@ public class ResourceIdManager implements IResourceIdManager {
     }
 
     @Override
-    public synchronized void report(String nodeId, long maxResourceId) {
+    public synchronized void report(String nodeId, long maxResourceId) throws HyracksDataException
{
         if (!allReported) {
             globalResourceId.set(Math.max(maxResourceId, globalResourceId.get()));
             reportedNodes.add(nodeId);
             if (reportedNodes.size() == csm.getNumberOfNodes()) {
                 reportedNodes = null;
                 allReported = true;
+                csm.refreshState();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 36cb10d..334b683 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -40,6 +40,7 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -118,6 +119,10 @@ public class ClusterStateManager implements IClusterStateManager {
 
     @Override
     public synchronized void setState(ClusterState state) {
+        if (this.state == state) {
+            LOGGER.info("ignoring update to same cluster state of " + this.state);
+            return;
+        }
         LOGGER.info("updating cluster state from " + this.state + " to " + state.name());
         this.state = state;
         appCtx.getGlobalRecoveryManager().notifyStateChange(state);
@@ -166,24 +171,35 @@ public class ClusterStateManager implements IClusterStateManager {
             setState(ClusterState.UNUSABLE);
             return;
         }
-
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 setState(ClusterState.UNUSABLE);
                 return;
             }
         }
-
-        // if all storage partitions are active as well as the metadata node, then the cluster
is active
+        IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+        for (String node : activeNcConfiguration.keySet()) {
+            if (!resourceIdManager.reported(node)) {
+                LOGGER.log(Level.INFO, "Partitions are ready but %s has not yet registered
its max resource id...",
+                        node);
+                setState(ClusterState.UNUSABLE);
+                return;
+            }
+        }
+        // the metadata bootstrap & global recovery must be complete before the cluster
can be active
         if (metadataNodeActive) {
-            if (state != ClusterState.ACTIVE) {
+            if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING)
{
                 setState(ClusterState.PENDING);
             }
             appCtx.getMetadataBootstrap().init();
-            setState(ClusterState.ACTIVE);
-            notifyAll();
-            // start global recovery
-            appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+
+            if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+                setState(ClusterState.ACTIVE);
+            } else {
+                // start global recovery
+                setState(ClusterState.RECOVERING);
+                appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+            }
         } else {
             setState(ClusterState.PENDING);
         }
@@ -269,8 +285,8 @@ public class ClusterStateManager implements IClusterStateManager {
                 clusterActiveLocations.add(p.getActiveNodeId());
             }
         }
-        clusterPartitionConstraint =
-                new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new
String[] {}));
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
     }
 
     @Override
@@ -432,8 +448,8 @@ public class ClusterStateManager implements IClusterStateManager {
     }
 
     private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration)
{
-        ConfigManager configManager =
-                ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager();
+        ConfigManager configManager = ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig())
+                .getConfigManager();
         for (Map.Entry<IOption, Object> entry : configuration.entrySet()) {
             if (entry.getKey().section() == Section.NC) {
                 configManager.set(nodeId, entry.getKey(), entry.getValue());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
index 3ce314f..1d22f85 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
@@ -21,12 +21,15 @@ package org.apache.hyracks.api.application;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.kohsuke.args4j.OptionHandlerFilter;
 
+@SuppressWarnings("squid:S00112") // define and throw specific class of Exception
 public interface IApplication {
-    void start(IServiceContext ctx, String[] args) throws Exception; //NOSONAR
+    void init(IServiceContext serviceCtx) throws Exception;
 
-    void startupCompleted() throws Exception; //NOSONAR
+    void start(String[] args) throws Exception;
 
-    void stop() throws Exception; //NOSONAR
+    void startupCompleted() throws Exception;
+
+    void stop() throws Exception;
 
     void registerConfig(IConfigManager configManager);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index b94cf01..5ea51d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -40,7 +40,12 @@ public class BaseCCApplication implements ICCApplication {
     }
 
     @Override
-    public void start(IServiceContext serviceCtx, String[] args) throws Exception {
+    public void init(IServiceContext serviceCtx) throws Exception {
+        // no-op
+    }
+
+    @Override
+    public void start(String[] args) throws Exception {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a243bf8..dfc79ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -198,9 +198,9 @@ public class ClusterControllerService implements IControllerService {
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()),
ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
-        clientIPC =
-                new IPCSystem(new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
-                        ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+        clientIPC = new IPCSystem(
+                new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
ciIPCI,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this, ccConfig.getConsoleListenPort());
         clusterIPC.start();
         clientIPC.start();
@@ -221,15 +221,16 @@ public class ClusterControllerService implements IControllerService
{
     private void startApplication() throws Exception {
         serviceCtx = new CCServiceContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
         serviceCtx.addJobLifecycleListener(datasetDirectoryService);
+        application.init(serviceCtx);
         executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
-        application.start(serviceCtx, ccConfig.getAppArgsArray());
+        application.start(ccConfig.getAppArgsArray());
         IJobCapacityController jobCapacityController = application.getJobCapacityController();
 
         // Job manager is in charge of job lifecycle management.
         try {
-            Constructor<?> jobManagerConstructor =
-                    this.getClass().getClassLoader().loadClass(ccConfig.getJobManagerClass()).getConstructor(
-                            CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
+            Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
+                    .loadClass(ccConfig.getJobManagerClass())
+                    .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
             jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this,
jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
| NoSuchMethodException
                 | InvocationTargetException e) {
@@ -406,8 +407,8 @@ public class ClusterControllerService implements IControllerService {
 
         @Override
         public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws
HyracksDataException {
-            GetIpAddressNodeNameMapWork ginmw =
-                    new GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(),
map);
+            GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(
+                    ClusterControllerService.this.getNodeManager(), map);
             try {
                 workQueue.scheduleAndSync(ginmw);
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 5afc98d..4d8cbbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -41,7 +41,12 @@ public class BaseNCApplication implements INCApplication {
     }
 
     @Override
-    public void start(IServiceContext ncAppCtx, String[] args) throws Exception {
+    public void init(IServiceContext serviceCtx) throws Exception {
+        // no-op
+    }
+
+    @Override
+    public void start(String[] args) throws Exception {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b52675c..ed5598b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -194,8 +194,8 @@ public class NodeControllerService implements IControllerService {
         // Set shutdown hook before so it doesn't have the same uncaught exception handler
         Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this));
         Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
-        ioManager =
-                new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
+        ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
+                application.getFileDeviceResolver());
 
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of
the heartbeat thread.
         jobletMap = new Hashtable<>();
@@ -336,8 +336,8 @@ public class NodeControllerService implements IControllerService {
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        NetworkAddress meesagingPort =
-                messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
: null;
+        NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
+                : null;
         int allCores = osMXBean.getAvailableProcessors();
         nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
datasetAddress,
                 osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
runtimeMXBean.getVmName(),
@@ -365,8 +365,9 @@ public class NodeControllerService implements IControllerService {
 
     private void startApplication() throws Exception {
         serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager,
lccm, ncConfig.getAppConfig());
-        application.start(serviceCtx, ncConfig.getAppArgsArray());
+        application.init(serviceCtx);
         executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+        application.start(ncConfig.getAppArgsArray());
     }
 
     public void updateMaxJobId(JobId jobId) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index dc0bf0c..d659fe6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -19,7 +19,6 @@
 package org.apache.hyracks.control.nc.application;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.Serializable;
 
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -54,13 +53,7 @@ public class NCServiceContext extends ServiceContext implements INCServiceContex
         this.ioManager = ioManager;
         this.memoryManager = memoryManager;
         this.ncs = ncs;
-        sdh = new IStateDumpHandler() {
-
-            @Override
-            public void dumpState(OutputStream os) throws IOException {
-                lccm.dumpState(os);
-            }
-        };
+        sdh = lccm::dumpState;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f52bc588/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index 2967039..780a65c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -30,11 +30,16 @@ public class TestNCApplication implements INCApplication {
     private RuntimeContext rCtx;
 
     @Override
-    public void start(IServiceContext serviceCtx, String[] args) throws Exception {
+    public void init(IServiceContext serviceCtx) throws Exception {
         rCtx = new RuntimeContext((INCServiceContext) serviceCtx);
     }
 
     @Override
+    public void start(String[] args) throws Exception {
+        // No-op
+    }
+
+    @Override
     public void startupCompleted() throws Exception {
         // No-op
     }


Mime
View raw message