asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [2/6] asterixdb git commit: Refactor Messaging
Date Sat, 03 Sep 2016 12:44:47 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..b18a879
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        HyracksDataException hde = null;
+        try {
+            appContext.initializeMetadata(false);
+            appContext.exportMetadataNodeStub();
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+            hde = new HyracksDataException(e);
+        } finally {
+            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+                    appContext.getTransactionSubsystem().getId());
+            try {
+                broker.sendMessageToCC(reponse, null);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            }
+        }
+        if (hde != null) {
+            throw hde;
+        }
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_METADATA_NODE_REQUEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..f7016bc
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+
+    public TakeoverMetadataNodeResponseMessage(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_METADATA_NODE_RESPONSE";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..3b91084
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
+    private final Integer[] partitions;
+    private final long requestId;
+    private final String nodeId;
+
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Request ID: " + requestId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Partitions: ");
+        for (Integer partitionId : partitions) {
+            sb.append(partitionId + ",");
+        }
+        //remove last comma
+        sb.charAt(sb.length() - 1);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        //if the NC is shutting down, it should ignore takeover partitions request
+        if (!appContext.isShuttingdown()) {
+            HyracksDataException hde = null;
+            try {
+                IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+                remoteRecoeryManager.takeoverPartitons(partitions);
+            } catch (IOException | ACIDException e) {
+                LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            } finally {
+                //send response after takeover is completed
+                TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
+                        appContext.getTransactionSubsystem().getId(), partitions);
+                try {
+                    broker.sendMessageToCC(reponse, null);
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
+                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+                }
+            }
+            if (hde != null) {
+                throw hde;
+            }
+        }
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_PARTITIONS_REQUEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..9ec71b7
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String nodeId;
+    private final long requestId;
+
+    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_PARTITIONS_RESPONSE";
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
new file mode 100644
index 0000000..4cb65c2
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.transaction;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
+import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
+import org.apache.hyracks.api.application.IApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+/**
+ * A resource id factory that generates unique resource ids across all NCs by requesting
+ * unique ids from the cluster controller.
+ */
+public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
+
+    private final IApplicationContext appCtx;
+    private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+    private final String nodeId;
+
+    public GlobalResourceIdFactory(IApplicationContext appCtx) {
+        this.appCtx = appCtx;
+        this.resourceIdResponseQ = new LinkedBlockingQueue<>();
+        this.nodeId = ((NodeControllerService) appCtx.getControllerService()).getApplicationContext().getNodeId();
+    }
+
+    @Override
+    public long createId() throws HyracksDataException {
+        try {
+            ResourceIdRequestResponseMessage reponse = null;
+            //if there already exists a response, use it
+            if (!resourceIdResponseQ.isEmpty()) {
+                synchronized (resourceIdResponseQ) {
+                    if (!resourceIdResponseQ.isEmpty()) {
+                        reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                    }
+                }
+            }
+            //if no response available or it has an exception, request a new one
+            if (reponse == null || reponse.getException() != null) {
+                ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
+                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
+                reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
+                if (reponse.getException() != null) {
+                    throw new HyracksDataException(reponse.getException().getMessage());
+                }
+            }
+            return reponse.getResourceId();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void deliverMessageResponse(IApplicationMessage message) {
+        resourceIdResponseQ.offer(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
new file mode 100644
index 0000000..9027f75
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.transaction;
+
+import org.apache.hyracks.api.application.IApplicationContext;
+
+public class GlobalResourceIdFactoryProvider {
+
+    private final IApplicationContext appCtx;
+
+    public GlobalResourceIdFactoryProvider(IApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+
+    public GlobalResourceIdFactory createResourceIdFactory() {
+        return new GlobalResourceIdFactory(appCtx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
new file mode 100644
index 0000000..4f2cebd
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.config.AsterixBuildProperties;
+import org.apache.asterix.common.config.AsterixCompilerProperties;
+import org.apache.asterix.common.config.AsterixExtensionProperties;
+import org.apache.asterix.common.config.AsterixExternalProperties;
+import org.apache.asterix.common.config.AsterixFeedProperties;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
+import org.apache.asterix.common.config.AsterixPropertiesAccessor;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.AsterixStorageProperties;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
+import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+
+/*
+ * Acts as an holder class for IndexRegistryProvider, AsterixStorageManager
+ * instances that are accessed from the NCs. In addition an instance of ICCApplicationContext
+ * is stored for access by the CC.
+ */
+public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IAsterixPropertiesProvider {
+
+    public static final AsterixAppContextInfo INSTANCE = new AsterixAppContextInfo();
+    private ICCApplicationContext appCtx;
+    private IGlobalRecoveryMaanger globalRecoveryMaanger;
+    private ILibraryManager libraryManager;
+    private IAsterixResourceIdManager resourceIdManager;
+    private AsterixCompilerProperties compilerProperties;
+    private AsterixExternalProperties externalProperties;
+    private AsterixMetadataProperties metadataProperties;
+    private AsterixStorageProperties storageProperties;
+    private AsterixTransactionProperties txnProperties;
+    private AsterixFeedProperties feedProperties;
+    private AsterixBuildProperties buildProperties;
+    private AsterixReplicationProperties replicationProperties;
+    private AsterixExtensionProperties extensionProperties;
+    private MessagingProperties messagingProperties;
+    private IHyracksClientConnection hcc;
+    private Object extensionManager;
+    private volatile boolean initialized = false;
+
+    private AsterixAppContextInfo() {
+    }
+
+    public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+            IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager,
+            IAsterixResourceIdManager resourceIdManager)
+            throws AsterixException, IOException {
+        if (INSTANCE.initialized) {
+            throw new AsterixException(AsterixAppContextInfo.class.getSimpleName() + " has been initialized already");
+        }
+        INSTANCE.initialized = true;
+        INSTANCE.appCtx = ccAppCtx;
+        INSTANCE.hcc = hcc;
+        INSTANCE.globalRecoveryMaanger = globalRecoveryMaanger;
+        INSTANCE.libraryManager = libraryManager;
+        INSTANCE.resourceIdManager = resourceIdManager;
+        // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
+        // QQQ strip this out eventually
+        AsterixPropertiesAccessor propertiesAccessor;
+        IApplicationConfig cfg = ccAppCtx.getAppConfig();
+        // QQQ this is NOT a good way to determine whether the config is valid
+        if (cfg.getString("cc", "cluster.address") != null) {
+            propertiesAccessor = new AsterixPropertiesAccessor(cfg);
+        } else {
+            propertiesAccessor = new AsterixPropertiesAccessor();
+        }
+        INSTANCE.compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
+        INSTANCE.externalProperties = new AsterixExternalProperties(propertiesAccessor);
+        INSTANCE.metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
+        INSTANCE.storageProperties = new AsterixStorageProperties(propertiesAccessor);
+        INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor);
+        INSTANCE.feedProperties = new AsterixFeedProperties(propertiesAccessor);
+        INSTANCE.extensionProperties = new AsterixExtensionProperties(propertiesAccessor);
+        INSTANCE.replicationProperties = new AsterixReplicationProperties(propertiesAccessor,
+                AsterixClusterProperties.INSTANCE.getCluster());
+        INSTANCE.hcc = hcc;
+        INSTANCE.buildProperties = new AsterixBuildProperties(propertiesAccessor);
+        INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor);
+        Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
+    }
+
+    public boolean initialized() {
+        return initialized;
+    }
+
+    @Override
+    public ICCApplicationContext getCCApplicationContext() {
+        return appCtx;
+    }
+
+    @Override
+    public AsterixStorageProperties getStorageProperties() {
+        return storageProperties;
+    }
+
+    @Override
+    public AsterixTransactionProperties getTransactionProperties() {
+        return txnProperties;
+    }
+
+    @Override
+    public AsterixCompilerProperties getCompilerProperties() {
+        return compilerProperties;
+    }
+
+    @Override
+    public AsterixMetadataProperties getMetadataProperties() {
+        return metadataProperties;
+    }
+
+    @Override
+    public AsterixExternalProperties getExternalProperties() {
+        return externalProperties;
+    }
+
+    @Override
+    public AsterixFeedProperties getFeedProperties() {
+        return feedProperties;
+    }
+
+    @Override
+    public AsterixBuildProperties getBuildProperties() {
+        return buildProperties;
+    }
+
+    public IHyracksClientConnection getHcc() {
+        return hcc;
+    }
+
+    @Override
+    public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
+        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
+    }
+
+    @Override
+    public IStorageManagerInterface getStorageManagerInterface() {
+        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
+    }
+
+    @Override
+    public AsterixReplicationProperties getReplicationProperties() {
+        return replicationProperties;
+    }
+
+    @Override
+    public IGlobalRecoveryMaanger getGlobalRecoveryManager() {
+        return globalRecoveryMaanger;
+    }
+
+    @Override
+    public ILibraryManager getLibraryManager() {
+        return libraryManager;
+    }
+
+    public Object getExtensionManager() {
+        return extensionManager;
+    }
+
+    public void setExtensionManager(Object extensionManager) {
+        this.extensionManager = extensionManager;
+    }
+
+    public AsterixExtensionProperties getExtensionProperties() {
+        return extensionProperties;
+    }
+
+    @Override
+    public MessagingProperties getMessagingProperties() {
+        return messagingProperties;
+    }
+
+    public IAsterixResourceIdManager getResourceIdManager() {
+        return resourceIdManager;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
new file mode 100644
index 0000000..2457ddc
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * A holder class for properties related to the Asterix cluster.
+ */
+
+public class AsterixClusterProperties {
+    /*
+     * TODO: currently after instance restarts we require all nodes to join again,
+     * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance
+     * shutdown and using it on startup to identify the nodes that are expected the join.
+     */
+
+    private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
+    public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
+    public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
+
+    private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
+    private static final String IO_DEVICES = "iodevices";
+    private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
+    private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>();
+
+    private final Cluster cluster;
+    private ClusterState state = ClusterState.UNUSABLE;
+
+    private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
+
+    private boolean globalRecoveryCompleted = false;
+
+    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+    private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
+
+    private long clusterRequestId = 0;
+    private String currentMetadataNode = null;
+    private boolean metadataNodeActive = false;
+    private boolean autoFailover = false;
+    private boolean replicationEnabled = false;
+    private Set<String> failedNodes = new HashSet<>();
+    private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
+    private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+
+    private AsterixClusterProperties() {
+        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
+        if (is != null) {
+            try {
+                JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+                Unmarshaller unmarshaller = ctx.createUnmarshaller();
+                cluster = (Cluster) unmarshaller.unmarshal(is);
+            } catch (JAXBException e) {
+                throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE, e);
+            }
+        } else {
+            cluster = null;
+        }
+        // if this is the CC process
+        if (AsterixAppContextInfo.INSTANCE.initialized()
+                && AsterixAppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+            node2PartitionsMap = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
+            clusterPartitions = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
+            currentMetadataNode = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+            replicationEnabled = isReplicationEnabled();
+            autoFailover = isAutoFailoverEnabled();
+            if (autoFailover) {
+                pendingTakeoverRequests = new HashMap<>();
+                pendingProcessingFailbackPlans = new LinkedList<>();
+                planId2FailbackPlanMap = new HashMap<>();
+            }
+        }
+    }
+
+    public synchronized void removeNCConfiguration(String nodeId) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Removing configuration parameters for node id " + nodeId);
+        }
+        activeNcConfiguration.remove(nodeId);
+
+        //if this node was waiting for failback and failed before it completed
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                notifyFailbackPlansNodeFailure(nodeId);
+                revertFailedFailbackPlanEffects();
+            }
+        } else {
+            //an active node failed
+            failedNodes.add(nodeId);
+            if (nodeId.equals(currentMetadataNode)) {
+                metadataNodeActive = false;
+                LOGGER.info("Metadata node is now inactive");
+            }
+            updateNodePartitions(nodeId, false);
+            if (replicationEnabled) {
+                notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+                if (autoFailover) {
+                    notifyFailbackPlansNodeFailure(nodeId);
+                    requestPartitionsTakeover(nodeId);
+                }
+            }
+        }
+    }
+
+    public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Registering configuration parameters for node id " + nodeId);
+        }
+        activeNcConfiguration.put(nodeId, configuration);
+
+        //a node trying to come back after failure
+        if (failedNodes.contains(nodeId)) {
+            if (autoFailover) {
+                prepareFailbackPlan(nodeId);
+                return;
+            } else {
+                //a node completed local or remote recovery and rejoined
+                failedNodes.remove(nodeId);
+                if (replicationEnabled) {
+                    //notify other replica to reconnect to this node
+                    notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+                }
+            }
+        }
+
+        if (nodeId.equals(currentMetadataNode)) {
+            metadataNodeActive = true;
+            LOGGER.info("Metadata node is now active");
+        }
+        updateNodePartitions(nodeId, true);
+    }
+
+    private synchronized void updateNodePartitions(String nodeId, boolean added) {
+        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+        // if this isn't a storage node, it will not have cluster partitions
+        if (nodePartitions != null) {
+            for (ClusterPartition p : nodePartitions) {
+                // set the active node for this node's partitions
+                p.setActive(added);
+                if (added) {
+                    p.setActiveNodeId(nodeId);
+                }
+            }
+            resetClusterPartitionConstraint();
+            updateClusterState();
+        }
+    }
+
+    private synchronized void updateClusterState() {
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (!p.isActive()) {
+                state = ClusterState.UNUSABLE;
+                LOGGER.info("Cluster is in UNUSABLE state");
+                return;
+            }
+        }
+        //if all storage partitions are active as well as the metadata node, then the cluster is active
+        if (metadataNodeActive) {
+            state = ClusterState.ACTIVE;
+            LOGGER.info("Cluster is now ACTIVE");
+            //start global recovery
+            AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
+            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
+                processPendingFailbackPlans();
+            }
+        } else {
+            requestMetadataNodeTakeover();
+        }
+    }
+
+    /**
+     * Returns the IO devices configured for a Node Controller
+     *
+     * @param nodeId
+     *            unique identifier of the Node Controller
+     * @return a list of IO devices.
+     */
+    public synchronized String[] getIODevices(String nodeId) {
+        Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
+        if (ncConfig == null) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Configuration parameters for nodeId " + nodeId
+                        + " not found. The node has not joined yet or has left.");
+            }
+            return new String[0];
+        }
+        return ncConfig.get(IO_DEVICES).split(",");
+    }
+
+    public ClusterState getState() {
+        return state;
+    }
+
+    public Cluster getCluster() {
+        return cluster;
+    }
+
+    public synchronized Node getAvailableSubstitutionNode() {
+        List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode();
+        return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
+    }
+
+    public synchronized Set<String> getParticipantNodes() {
+        Set<String> participantNodes = new HashSet<>();
+        for (String pNode : activeNcConfiguration.keySet()) {
+            participantNodes.add(pNode);
+        }
+        return participantNodes;
+    }
+
+    public synchronized AlgebricksAbsolutePartitionConstraint getClusterLocations() {
+        if (clusterPartitionConstraint == null) {
+            resetClusterPartitionConstraint();
+        }
+        return clusterPartitionConstraint;
+    }
+
+    private synchronized void resetClusterPartitionConstraint() {
+        ArrayList<String> clusterActiveLocations = new ArrayList<>();
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (p.isActive()) {
+                clusterActiveLocations.add(p.getActiveNodeId());
+            }
+        }
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
+    }
+
+    public boolean isGlobalRecoveryCompleted() {
+        return globalRecoveryCompleted;
+    }
+
+    public void setGlobalRecoveryCompleted(boolean globalRecoveryCompleted) {
+        this.globalRecoveryCompleted = globalRecoveryCompleted;
+    }
+
+    public boolean isClusterActive() {
+        if (cluster == null) {
+            // this is a virtual cluster
+            return true;
+        }
+        return state == ClusterState.ACTIVE;
+    }
+
+    public static int getNumberOfNodes() {
+        return AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
+    }
+
+    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+        return node2PartitionsMap.get(nodeId);
+    }
+
+    public synchronized int getNodePartitionsCount(String node) {
+        if (node2PartitionsMap.containsKey(node)) {
+            return node2PartitionsMap.get(node).length;
+        }
+        return 0;
+    }
+
+    public synchronized ClusterPartition[] getClusterPartitons() {
+        ArrayList<ClusterPartition> partitons = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            partitons.add(partition);
+        }
+        return partitons.toArray(new ClusterPartition[] {});
+    }
+
+    public String getStorageDirectoryName() {
+        if (cluster != null) {
+            return cluster.getStore();
+        }
+        // virtual cluster without cluster config file
+        return DEFAULT_STORAGE_DIR_NAME;
+    }
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
+                .getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
+        if (!lostPartitions.isEmpty()) {
+            for (ClusterPartition partition : lostPartitions) {
+                //find replicas for this partitions
+                Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                //find a replica that is still active
+                for (String replica : partitionReplicas) {
+                    //TODO (mhubail) currently this assigns the partition to the first found active replica.
+                    //It needs to be modified to consider load balancing.
+                    addActiveReplica(replica, partition, partitionRecoveryPlan);
+                    // bug? will always break on first loop execution
+                    break;
+                }
+            }
+
+            if (partitionRecoveryPlan.size() == 0) {
+                //no active replicas were found for the failed node
+                LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
+                return;
+            } else {
+                LOGGER.info("Partitions to recover: " + lostPartitions);
+            }
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
+                    .getCCApplicationContext().getMessageBroker();
+            //For each replica, send a request to takeover the assigned partitions
+            for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) {
+                String replica = entry.getKey();
+                Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]);
+                long requestId = clusterRequestId++;
+                TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
+                        replica, partitionsToTakeover);
+                pendingTakeoverRequests.put(requestId, takeoverRequest);
+                try {
+                    messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+                } catch (Exception e) {
+                    /**
+                     * if we fail to send the request, it means the NC we tried to send the request to
+                     * has failed. When the failure notification arrives, we will send any pending request
+                     * that belongs to the failed NC to a different active replica.
+                     */
+                    LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e);
+                }
+            }
+        }
+    }
+
+    private void addActiveReplica(String replica, ClusterPartition partition,
+            Map<String, List<Integer>> partitionRecoveryPlan) {
+        if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+            if (!partitionRecoveryPlan.containsKey(replica)) {
+                List<Integer> replicaPartitions = new ArrayList<>();
+                replicaPartitions.add(partition.getPartitionId());
+                partitionRecoveryPlan.put(replica, replicaPartitions);
+            } else {
+                partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+            }
+        }
+    }
+
+    private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request that this node was supposed to handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitions.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+        return nodePartitions;
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = AsterixAppContextInfo.INSTANCE.getMetadataProperties()
+                .getMetadataPartition();
+        //request the metadataPartition node to register itself as the metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
+                .getCCApplicationContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
+        } catch (Exception e) {
+            /**
+             * if we fail to send the request, it means the NC we tried to send the request to
+             * has failed. When the failure notification arrives, a new NC will be assigned to
+             * the metadata partition and a new metadata node takeover request will be sent to it.
+             */
+            LOGGER.log(Level.WARNING,
+                    "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e);
+        }
+    }
+
+    public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
+        for (Integer partitonId : reponse.getPartitions()) {
+            ClusterPartition partition = clusterPartitions.get(partitonId);
+            partition.setActive(true);
+            partition.setActiveNodeId(reponse.getNodeId());
+        }
+        pendingTakeoverRequests.remove(reponse.getRequestId());
+        resetClusterPartitionConstraint();
+        updateClusterState();
+    }
+
+    public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
+        currentMetadataNode = reponse.getNodeId();
+        metadataNodeActive = true;
+        LOGGER.info("Current metadata node: " + currentMetadataNode);
+        updateClusterState();
+    }
+
+    private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+        NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+        pendingProcessingFailbackPlans.add(plan);
+        planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+        //get all partitions this node requires to resync
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
+                .getReplicationProperties();
+        Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
+        for (String replicaId : nodeReplicas) {
+            ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId);
+            for (ClusterPartition partition : nodePartitions) {
+                plan.addParticipant(partition.getActiveNodeId());
+                /**
+                 * if the partition original node is the returning node,
+                 * add it to the list of the partitions which will be failed back
+                 */
+                if (partition.getNodeId().equals(failingBackNodeId)) {
+                    plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
+                }
+            }
+        }
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Prepared Failback plan: " + plan.toString());
+        }
+
+        processPendingFailbackPlans();
+    }
+
+    private synchronized void processPendingFailbackPlans() {
+        /**
+         * if the cluster state is not ACTIVE, then failbacks should not be processed
+         * since some partitions are not active
+         */
+        if (state == ClusterState.ACTIVE) {
+            while (!pendingProcessingFailbackPlans.isEmpty()) {
+                //take the first pending failback plan
+                NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+                /**
+                 * A plan at this stage will be in one of two states:
+                 * 1. PREPARING -> the participants were selected but we haven't sent any request.
+                 * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
+                 */
+                if (plan.getState() == FailbackPlanState.PREPARING) {
+                    //set the partitions that will be failed back as inactive
+                    String failbackNode = plan.getNodeId();
+                    for (Integer partitionId : plan.getPartitionsToFailback()) {
+                        ClusterPartition clusterPartition = clusterPartitions.get(partitionId);
+                        clusterPartition.setActive(false);
+                        //partition expected to be returned to the failing back node
+                        clusterPartition.setActiveNodeId(failbackNode);
+                    }
+
+                    /**
+                     * if the returning node is the original metadata node,
+                     * then metadata node will change after the failback completes
+                     */
+                    String originalMetadataNode = AsterixAppContextInfo.INSTANCE.getMetadataProperties()
+                            .getMetadataNodeName();
+                    if (originalMetadataNode.equals(failbackNode)) {
+                        plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+                        currentMetadataNode = "";
+                        metadataNodeActive = false;
+                    }
+
+                    //force new jobs to wait
+                    state = ClusterState.REBALANCING;
+                    ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
+                            .getCCApplicationContext().getMessageBroker();
+                    handleFailbackRequests(plan, messageBroker);
+                    /**
+                     * wait until the current plan is completed before processing the next plan.
+                     * when the current one completes or is reverted, the cluster state will be
+                     * ACTIVE again, and the next failback plan (if any) will be processed.
+                     */
+                    break;
+                } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                    //this plan failed before sending any requests -> nothing to rollback
+                    planId2FailbackPlanMap.remove(plan.getPlanId());
+                }
+            }
+        }
+    }
+
+    private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) {
+        //send requests to other nodes to complete on-going jobs and prepare partitions for failback
+        for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) {
+            try {
+                messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
+                plan.addPendingRequest(request);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e);
+                plan.notifyNodeFailure(request.getNodeID());
+                revertFailedFailbackPlanEffects();
+                break;
+            }
+        }
+    }
+
+    public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
+        NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+        plan.markRequestCompleted(msg.getRequestId());
+        /**
+         * A plan at this stage will be in one of three states:
+         * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
+         * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
+         * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
+         */
+        if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+            CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
+
+            //send complete resync and takeover partitions to the failing back node
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
+                    .getCCApplicationContext().getMessageBroker();
+            try {
+                messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e);
+                notifyFailbackPlansNodeFailure(request.getNodeId());
+                revertFailedFailbackPlanEffects();
+            }
+        } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+            revertFailedFailbackPlanEffects();
+        }
+    }
+
+    public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
+        /**
+         * the failback plan completed successfully:
+         * Remove all references to it.
+         * Remove the the failing back node from the failed nodes list.
+         * Notify its replicas to reconnect to it.
+         * Set the failing back node partitions as active.
+         */
+        NodeFailbackPlan plan = planId2FailbackPlanMap.remove(reponse.getPlanId());
+        String nodeId = plan.getNodeId();
+        failedNodes.remove(nodeId);
+        //notify impacted replicas they can reconnect to this node
+        notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+        updateNodePartitions(nodeId, true);
+    }
+
+    private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
+                .getReplicationProperties();
+        Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
+        String nodeIdAddress = "";
+        //in case the node joined with a new IP address, we need to send it to the other replicas
+        if (event == ClusterEventType.NODE_JOIN) {
+            nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+        }
+
+        ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
+                .getCCApplicationContext().getMessageBroker();
+        for (String replica : remoteReplicas) {
+            //if the remote replica is alive, send the event
+            if (activeNcConfiguration.containsKey(replica)) {
+                try {
+                    messageBroker.sendApplicationMessageToNC(msg, replica);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
+                }
+            }
+        }
+    }
+
+    private synchronized void revertFailedFailbackPlanEffects() {
+        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+                //TODO if the failing back node is still active, notify it to construct a new plan for it
+                iterator.remove();
+
+                //reassign the partitions that were supposed to be failed back to an active replica
+                requestPartitionsTakeover(plan.getNodeId());
+            }
+        }
+    }
+
+    private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+        Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+        while (iterator.hasNext()) {
+            NodeFailbackPlan plan = iterator.next();
+            plan.notifyNodeFailure(nodeId);
+        }
+    }
+
+    public synchronized boolean isMetadataNodeActive() {
+        return metadataNodeActive;
+    }
+
+    public boolean isReplicationEnabled() {
+        if (cluster != null && cluster.getDataReplication() != null) {
+            return cluster.getDataReplication().isEnabled();
+        }
+        return false;
+    }
+
+    public boolean isAutoFailoverEnabled() {
+        return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover();
+    }
+
+    public synchronized JSONObject getClusterStateDescription() throws JSONException {
+        JSONObject stateDescription = new JSONObject();
+        stateDescription.put("State", state.name());
+        stateDescription.put("Metadata_Node", currentMetadataNode);
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            stateDescription.put("partition_" + partition.getPartitionId(), partition.getActiveNodeId());
+        }
+        return stateDescription;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java
new file mode 100644
index 0000000..abe94bf
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.file.IFileMapProvider;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
+        ILSMIOOperationSchedulerProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
+
+    private AsterixRuntimeComponentsProvider() {
+    }
+
+    @Override
+    public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getLSMIOScheduler();
+    }
+
+    @Override
+    public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getBufferCache();
+    }
+
+    @Override
+    public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getFileMapManager();
+    }
+
+    @Override
+    public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getLocalResourceRepository();
+    }
+
+    @Override
+    public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getDatasetLifecycleManager();
+    }
+
+    @Override
+    public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getResourceIdFactory();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
new file mode 100644
index 0000000..2517df5
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
+/**
+ * Utility class for obtaining information on the set of Hyracks NodeController
+ * processes that are running on a given host.
+ */
+public class RuntimeUtils {
+
+    private RuntimeUtils() {
+    }
+
+    public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws HyracksDataException {
+        Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
+        return nodeControllerInfo.get(ipAddress);
+    }
+
+    public static List<String> getAllNodeControllers() throws HyracksDataException {
+        Collection<Set<String>> nodeControllersCollection = getNodeControllerMap().values();
+        List<String> nodeControllers = new ArrayList<>();
+        for (Set<String> ncCollection : nodeControllersCollection) {
+            nodeControllers.addAll(ncCollection);
+        }
+        return nodeControllers;
+    }
+
+    public static Map<InetAddress, Set<String>> getNodeControllerMap() throws HyracksDataException {
+        Map<InetAddress, Set<String>> map = new HashMap<>();
+        AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+        return map;
+    }
+
+    public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) {
+        ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.INSTANCE
+                .getCCApplicationContext().getControllerService();
+        map.putAll(ccs.getIpAddressNodeNameMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
deleted file mode 100644
index ab1ebe1..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.resource;
-
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.hyracks.api.application.IApplicationContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
-
-/**
- * A resource id factory that generates unique resource ids across all NCs by requesting unique ids from the cluster controller.
- */
-public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
-
-    private final IApplicationContext appCtx;
-    private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
-
-    public GlobalResourceIdFactory(IApplicationContext appCtx) {
-        this.appCtx = appCtx;
-        this.resourceIdResponseQ = new LinkedBlockingQueue<>();
-    }
-
-    @Override
-    public long createId() throws HyracksDataException {
-        try {
-            ResourceIdRequestResponseMessage reponse = null;
-            //if there already exists a response, use it
-            if (resourceIdResponseQ.size() > 0) {
-                synchronized (resourceIdResponseQ) {
-                    if (resourceIdResponseQ.size() > 0) {
-                        reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
-                    }
-                }
-            }
-            //if no response available or it has an exception, request a new one
-            if (reponse == null || reponse.getException() != null) {
-                ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
-                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
-                reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
-                if (reponse.getException() != null) {
-                    throw new HyracksDataException(reponse.getException().getMessage());
-                }
-            }
-            return reponse.getResourceId();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public void deliverMessageResponse(IApplicationMessage message) {
-        resourceIdResponseQ.offer(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2b95d9ac/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
deleted file mode 100644
index 7bcf379..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.resource;
-
-import org.apache.hyracks.api.application.IApplicationContext;
-
-public class GlobalResourceIdFactoryProvider {
-
-    private final IApplicationContext appCtx;
-
-    public GlobalResourceIdFactoryProvider(IApplicationContext appCtx) {
-        this.appCtx = appCtx;
-    }
-
-    public GlobalResourceIdFactory createResourceIdFactory() {
-        return new GlobalResourceIdFactory(appCtx);
-    }
-}


Mime
View raw message