asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: Divide Cluster into Unique Partitions
Date Tue, 29 Dec 2015 16:55:02 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/564

Change subject: Divide Cluster into Unique Partitions
......................................................................

Divide Cluster into Unique Partitions

The change includes the following:
- Fix passing NC stores to AsterixConfiguration.
- Unify storage direcotry name in the instance level rather than the node level.
- Divide the cluster into unique storage partitions based on the number of stores.
- Refactored FileSplits and moved out of AqlMetadataProvider.
- Make AsterixHyracksIntegrationUtil use the passed configuration file.
- Make File Splits pass relative index paths of partitions rather than absolute paths.
- Remove unused AqlCompiledMetadataDeclarations class.

Change-Id: I8c7fbca5113dd7ad569a46dfa2591addb5bf8655
---
M asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
M asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
M asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
M asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
M asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
M asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
M asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
M asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
M asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
M asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
M asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
A asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
M asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
D asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
M asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
A asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsProvider.java
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
52 files changed, 1,047 insertions(+), 1,123 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/64/564/1

diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 661fcb2..1309f1f 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -106,7 +107,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         UnnestMapOperator unnestMapOp = (UnnestMapOperator) op;
         ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
@@ -160,17 +161,15 @@
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
                     dataset.getDataverseName(), dataset.getDatasetName(), indexName);
             if (secondaryIndex == null) {
-                throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
-                        + datasetName);
+                throw new AlgebricksException(
+                        "Code generation error: no index " + indexName + " for dataset " + datasetName);
             }
             List<List<String>> secondaryKeyFieldEntries = secondaryIndex.getKeyFieldNames();
             List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes();
             int numSecondaryKeys = secondaryKeyFieldEntries.size();
             if (numSecondaryKeys != 1) {
-                throw new AlgebricksException(
-                        "Cannot use "
-                                + numSecondaryKeys
-                                + " fields as a key for an inverted index. There can be only one field as a key for the inverted index index.");
+                throw new AlgebricksException("Cannot use " + numSecondaryKeys
+                        + " fields as a key for an inverted index. There can be only one field as a key for the inverted index index.");
             }
             if (itemType.getTypeTag() != ATypeTag.RECORD) {
                 throw new AlgebricksException("Only record types can be indexed.");
@@ -180,8 +179,8 @@
                     secondaryKeyFieldEntries.get(0), recordType);
             IAType secondaryKeyType = keyPairType.first;
             if (secondaryKeyType == null) {
-                throw new AlgebricksException("Could not find field " + secondaryKeyFieldEntries.get(0)
-                        + " in the schema.");
+                throw new AlgebricksException(
+                        "Could not find field " + secondaryKeyFieldEntries.get(0) + " in the schema.");
             }
 
             // TODO: For now we assume the type of the generated tokens is the
@@ -217,8 +216,8 @@
                     typeEnv, context);
 
             ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType);
-            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
-                    dataset, recordType, context.getBinaryComparatorFactoryProvider());
+            IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+                    recordType, context.getBinaryComparatorFactoryProvider());
 
             int[] filterFields = null;
             int[] invertedIndexFields = null;
@@ -241,21 +240,21 @@
             }
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = SplitsAndConstraintsProvider
                     .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, indexName,
-                            dataset.getDatasetDetails().isTemp());
+                            dataset.getDatasetDetails().isTemp(), metadataProvider.getMetadataTxnContext());
             // TODO: Here we assume there is only one search key field.
             int queryField = keyFields[0];
             // Get tokenizer and search modifier factories.
             IInvertedIndexSearchModifierFactory searchModifierFactory = InvertedIndexAccessMethod
                     .getSearchModifierFactory(searchModifierType, simThresh, secondaryIndex);
-            IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(
-                    searchModifierType, searchKeyType, secondaryIndex);
+            IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod
+                    .getBinaryTokenizerFactory(searchModifierType, searchKeyType, secondaryIndex);
             IIndexDataflowHelperFactory dataflowHelperFactory;
 
             AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
-                    dataset, metadataProvider.getMetadataTxnContext());
+            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+                    .getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
             boolean temp = dataset.getDatasetDetails().isTemp();
             if (!isPartitioned) {
                 dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index c0613ed..07717c0 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -47,6 +47,7 @@
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.optimizer.base.RuleCollections;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
@@ -321,7 +322,7 @@
             return null;
         }
 
-        AlgebricksPartitionConstraint clusterLocs = queryMetadataProvider.getClusterLocations();
+        AlgebricksPartitionConstraint clusterLocs = AsterixClusterProperties.INSTANCE.getClusterLocations();
         builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory());
         builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
         builder.setClusterLocations(clusterLocs);
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 45c0598..1f30a72 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -50,6 +50,7 @@
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.feeds.FeedManager;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.asterix.replication.management.ReplicationManager;
@@ -187,7 +188,8 @@
             String nodeId = ncApplicationContext.getNodeId();
 
             replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
-                    metadataProperties.getStores().get(nodeId)[0], nodeId, replicationProperties.getReplicationStore());
+                    metadataProperties.getStorageDirectoryName(), nodeId,
+                    replicationProperties.getReplicationStore());
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -377,7 +379,6 @@
 
     @Override
     public void initializeResourceIdFactory() throws HyracksDataException {
-        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext)
-                .createResourceIdFactory();
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 0145651..b1397ee 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,11 +19,10 @@
 package org.apache.asterix.api.common;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.EnumSet;
+import java.util.Set;
 
 import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -41,22 +40,18 @@
 public class AsterixHyracksIntegrationUtil {
 
     private static final String IO_DIR_KEY = "java.io.tmpdir";
-    public static final int NODES = 2;
-    public static final int PARTITONS = 2;
-
     public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
-
     public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
 
     public static ClusterControllerService cc;
-    public static NodeControllerService[] ncs = new NodeControllerService[NODES];
+    public static NodeControllerService[] ncs;
     public static IHyracksClientConnection hcc;
 
-    protected static AsterixTransactionProperties txnProperties;
+    private static AsterixPropertiesAccessor propertiesAccessor;
 
     public static void init(boolean deleteOldInstanceData) throws Exception {
-        AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
-        txnProperties = new AsterixTransactionProperties(apa);
+        propertiesAccessor = new AsterixPropertiesAccessor();
+        ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
         if (deleteOldInstanceData) {
             deleteTransactionLogs();
             removeTestStorageFiles();
@@ -77,7 +72,8 @@
 
         // Starts ncs.
         int n = 0;
-        for (String ncName : getNcNames()) {
+        Set<String> nodes = propertiesAccessor.getNodeNames();
+        for (String ncName : nodes) {
             NCConfig ncConfig1 = new NCConfig();
             ncConfig1.ccHost = "localhost";
             ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
@@ -87,37 +83,41 @@
             ncConfig1.nodeId = ncName;
             ncConfig1.resultTTL = 30000;
             ncConfig1.resultSweepThreshold = 1000;
-            for (int p = 0; p < PARTITONS; ++p) {
+            String tempPath = System.getProperty(IO_DIR_KEY);
+            if (tempPath.endsWith(File.separator)) {
+                tempPath = tempPath.substring(0, tempPath.length() - 1);
+            }
+            //get initial partitions from properties
+            String[] nodeStores = propertiesAccessor.getStores().get(ncName);
+            if (nodeStores == null) {
+                throw new Exception("Coudn't find stores for NC: " + ncName);
+            }
+            String[] updatedStores = new String[nodeStores.length];
+            for (int p = 0; p < nodeStores.length; p++) {
+                //create IO devices based on stores
+                String iodevicePath = System.getProperty("java.io.tmpdir") + ncConfig1.nodeId + File.separator
+                        + nodeStores[p];
+                String storeFullPath = iodevicePath + File.separator + propertiesAccessor.getStorageDirName();
+                updatedStores[p] = storeFullPath;
                 if (p == 0) {
-                    ncConfig1.ioDevices = System.getProperty("java.io.tmpdir") + File.separator + ncConfig1.nodeId
-                            + "/iodevice" + p;
+                    ncConfig1.ioDevices = iodevicePath;
                 } else {
-                    ncConfig1.ioDevices += "," + System.getProperty("java.io.tmpdir") + File.separator
-                            + ncConfig1.nodeId + "/iodevice" + p;
+                    ncConfig1.ioDevices += "," + iodevicePath;
                 }
             }
             ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
             ncs[n] = new NodeControllerService(ncConfig1);
             ncs[n].start();
             ++n;
+
+            //update node store to include full paths
+            propertiesAccessor.updateNodeStores(ncName, updatedStores);
         }
         hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
     }
 
     public static String[] getNcNames() {
-        String[] names = new String[NODES];
-        for (int n = 0; n < NODES; ++n) {
-            names[n] = "asterix_nc" + (n + 1);
-        }
-        return names;
-    }
-
-    public static String[] getDataDirs() {
-        String[] names = new String[NODES];
-        for (int n = 0; n < NODES; ++n) {
-            names[n] = "asterix_nc" + (n + 1) + "data";
-        }
-        return names;
+        return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
     }
 
     public static IHyracksClientConnection getHyracksClientConnection() {
@@ -147,17 +147,17 @@
         hcc.waitForCompletion(jobId);
     }
 
-    private static void removeTestStorageFiles() throws IOException {
+    public static void removeTestStorageFiles() {
         File dir = new File(System.getProperty(IO_DIR_KEY));
-        for (String ncName : AsterixHyracksIntegrationUtil.getNcNames()) {
+        for (String ncName : propertiesAccessor.getNodeNames()) {
             File ncDir = new File(dir, ncName);
             FileUtils.deleteQuietly(ncDir);
         }
     }
 
     private static void deleteTransactionLogs() throws Exception {
-        for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
-            File log = new File(txnProperties.getLogDirectory(ncId));
+        for (String ncId : propertiesAccessor.getNodeNames()) {
+            File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
             if (log.exists()) {
                 FileUtils.deleteDirectory(log);
             }
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
index c83ce6a..db41b46 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -29,21 +29,21 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.asterix.feeds.CentralFeedManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.util.FlushDatasetUtils;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /***
  * The REST API that takes a dataverse name and a dataset name as the input
@@ -89,14 +89,14 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
             if (dataset == null) {
-                jsonResponse.put("error", "Dataset " + datasetName + " does not exist in " + "dataverse "
-                        + dataverseName);
+                jsonResponse.put("error",
+                        "Dataset " + datasetName + " does not exist in " + "dataverse " + dataverseName);
                 out.write(jsonResponse.toString());
                 out.flush();
                 return;
             }
             boolean temp = dataset.getDatasetDetails().isTemp();
-            FileSplit[] fileSplits = metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName,
+            FileSplit[] fileSplits = SplitsAndConstraintsProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName,
                     datasetName, temp);
             ARecordType recordType = (ARecordType) metadataProvider.findType(dataverseName, dataset.getItemTypeName());
             List<List<String>> primaryKeys = DatasetUtils.getPartitioningKeys(dataset);
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 714e05c..08b92e7 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -1246,7 +1246,6 @@
                 }
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-
             //#. mark PendingDropOp on the dataverse record by
             //   first, deleting the dataverse record from the DATAVERSE_DATASET
             //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index 013e021..7077eff 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -68,8 +69,8 @@
     private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
 
     public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
-            AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
-            ACIDException, AsterixException {
+            AqlMetadataProvider metadataProvider)
+                    throws AlgebricksException, HyracksDataException, RemoteException, ACIDException, AsterixException {
 
         String dataverseName = null;
         if (datasetDropStmt.getDataverseName() != null) {
@@ -110,21 +111,22 @@
         int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
         JobSpecification specPrimary = JobSpecificationUtils.createJobSpecification();
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
                 .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName,
-                        temp);
+                        temp, metadataProvider.getMetadataTxnContext());
         AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
 
         IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                        dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                splitsAndConstraint.first,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        compactionInfo.first, compactionInfo.second,
                         new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
-                        filterCmpFactories, btreeFields, filterFields, !temp));
+                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                        btreeFields, filterFields, !temp));
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
                 splitsAndConstraint.second);
 
@@ -160,8 +162,9 @@
         int[] filterFields = DatasetUtils.createFilterFields(dataset);
         int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp,
+                        metadata.getMetadataTxnContext());
         FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < fs.length; i++) {
@@ -183,12 +186,12 @@
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(dataset
-                                .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
-                                .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
-                        btreeFields, filterFields, !temp), localResourceFactoryProvider,
-                NoOpOperationCallbackFactory.INSTANCE);
+                        compactionInfo.first, compactionInfo.second,
+                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                        btreeFields, filterFields, !temp),
+                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 splitsAndConstraint.second);
         spec.addRoot(indexCreateOp);
@@ -227,8 +230,9 @@
         int[] filterFields = DatasetUtils.createFilterFields(dataset);
         int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp,
+                        metadata.getMetadataTxnContext());
 
         AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
 
@@ -238,16 +242,17 @@
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
-                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
-                        filterCmpFactories, btreeFields, filterFields, !temp), NoOpOperationCallbackFactory.INSTANCE);
-        AlgebricksPartitionConstraintHelper
-                .setPartitionConstraintInJobSpec(spec, compactOp, splitsAndConstraint.second);
+                        compactionInfo.first, compactionInfo.second,
+                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                        btreeFields, filterFields, !temp),
+                NoOpOperationCallbackFactory.INSTANCE);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                splitsAndConstraint.second);
 
-        AlgebricksPartitionConstraintHelper
-                .setPartitionConstraintInJobSpec(spec, compactOp, splitsAndConstraint.second);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
+                splitsAndConstraint.second);
         spec.addRoot(compactOp);
         return spec;
     }
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
index c77ca10..ba92057 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -30,7 +31,7 @@
 public class DataverseOperations {
     public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, AqlMetadataProvider metadata) {
         JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
                 .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
         FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
index d3fade5..627ce12 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
@@ -27,11 +27,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.asterix.common.api.ILocalResourceMetadata;
 import org.apache.asterix.common.config.AsterixStorageProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -67,6 +62,7 @@
 import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
@@ -80,6 +76,10 @@
 import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -107,6 +107,7 @@
 
     public static final List<List<String>> FILE_INDEX_FIELD_NAMES = new ArrayList<List<String>>();
     public static final ArrayList<IAType> FILE_INDEX_FIELD_TYPES = new ArrayList<IAType>();
+
     static {
         FILE_INDEX_FIELD_NAMES.add(new ArrayList<String>(Arrays.asList("")));
         FILE_INDEX_FIELD_TYPES.add(BuiltinType.ASTRING);
@@ -128,8 +129,8 @@
 
     public static boolean datasetUsesHiveAdapter(ExternalDatasetDetails ds) {
         String adapter = ds.getAdapter();
-        return (adapter.equalsIgnoreCase("hive") || adapter
-                .equalsIgnoreCase("org.apache.asterix.external.dataset.adapter.HIVEAdapter"));
+        return (adapter.equalsIgnoreCase("hive")
+                || adapter.equalsIgnoreCase("org.apache.asterix.external.dataset.adapter.HIVEAdapter"));
     }
 
     public static boolean isValidIndexName(String datasetName, String indexName) {
@@ -154,7 +155,8 @@
         return IndexingConstants.getBuddyBtreeComparatorFactories();
     }
 
-    public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset) throws AlgebricksException {
+    public static ArrayList<ExternalFile> getSnapshotFromExternalFileSystem(Dataset dataset)
+            throws AlgebricksException {
         ArrayList<ExternalFile> files = new ArrayList<ExternalFile>();
         ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
         try {
@@ -176,9 +178,9 @@
                     if (fileStatuses[i].isDirectory()) {
                         listSubFiles(dataset, fs, fileStatuses[i], files);
                     } else {
-                        files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(),
-                                nextFileNumber, fileStatuses[i].getPath().toUri().getPath(), new Date(fileStatuses[i]
-                                        .getModificationTime()), fileStatuses[i].getLen(),
+                        files.add(new ExternalFile(dataset.getDataverseName(), dataset.getDatasetName(), nextFileNumber,
+                                fileStatuses[i].getPath().toUri().getPath(),
+                                new Date(fileStatuses[i].getModificationTime()), fileStatuses[i].getLen(),
                                 ExternalFilePendingOp.PENDING_NO_OP));
                     }
                 }
@@ -223,7 +225,7 @@
 
     public static JobSpecification buildFilesIndexReplicationJobSpec(Dataset dataset,
             ArrayList<ExternalFile> externalFilesSnapshot, AqlMetadataProvider metadataProvider, boolean createIndex)
-            throws MetadataException, AlgebricksException {
+                    throws MetadataException, AlgebricksException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
         IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
         AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
@@ -231,21 +233,21 @@
                 metadataProvider.getMetadataTxnContext());
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
-                        dataset.getDatasetName(), getFilesIndexName(dataset.getDatasetName()), true);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = SplitsAndConstraintsProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+                        getFilesIndexName(dataset.getDatasetName()), true, metadataProvider.getMetadataTxnContext());
         IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
         FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
         ILocalResourceMetadata localResourceMetadata = new ExternalBTreeLocalResourceMetadata(
-                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
-                filesIndexDescription.FILES_INDEX_COMP_FACTORIES, new int[] { 0 }, false, dataset.getDatasetId(),
-                mergePolicyFactory, mergePolicyFactoryProperties);
+                filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES,
+                new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties);
         PersistentLocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                 localResourceMetadata, LocalResource.ExternalBTreeResource);
         ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
-                        dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
+                mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(),
                 ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
         ExternalFilesIndexOperatorDescriptor externalFilesOp = new ExternalFilesIndexOperatorDescriptor(spec,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
@@ -309,7 +311,7 @@
      */
     public static boolean isDatasetUptodate(Dataset dataset, List<ExternalFile> metadataFiles,
             List<ExternalFile> addedFiles, List<ExternalFile> deletedFiles, List<ExternalFile> appendedFiles)
-            throws MetadataException, AlgebricksException {
+                    throws MetadataException, AlgebricksException {
         boolean uptodate = true;
         int newFileNumber = metadataFiles.get(metadataFiles.size() - 1).getFileNumber() + 1;
 
@@ -340,9 +342,10 @@
                     } else {
                         // Same file name, Different file mod date -> delete and add
                         metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
-                        deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile
-                                .getDatasetName(), 0, metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
-                                metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP));
+                        deletedFiles
+                                .add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(), 0,
+                                        metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
+                                        metadataFile.getSize(), ExternalFilePendingOp.PENDING_DROP_OP));
                         fileSystemFile.setPendingOp(ExternalFilePendingOp.PENDING_ADD_OP);
                         fileSystemFile.setFileNumber(newFileNumber);
                         addedFiles.add(fileSystemFile);
@@ -382,8 +385,8 @@
             if (metadataFile.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP) {
                 metadataFile.setPendingOp(ExternalFilePendingOp.PENDING_DROP_OP);
                 deletedFiles.add(new ExternalFile(metadataFile.getDataverseName(), metadataFile.getDatasetName(),
-                        newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(), metadataFile
-                                .getSize(), metadataFile.getPendingOp()));
+                        newFileNumber, metadataFile.getFileName(), metadataFile.getLastModefiedTime(),
+                        metadataFile.getSize(), metadataFile.getPendingOp()));
                 newFileNumber++;
                 uptodate = false;
             }
@@ -414,20 +417,22 @@
         String indexName = indexDropStmt.getIndexName();
         boolean temp = dataset.getDatasetDetails().isTemp();
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true,
+                        metadataProvider.getMetadataTxnContext());
         AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
         IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                        dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                splitsAndConstraint.first,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        compactionInfo.first, compactionInfo.second,
                         new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
-        AlgebricksPartitionConstraintHelper
-                .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
         spec.addRoot(btreeDrop);
 
         return spec;
@@ -443,9 +448,9 @@
             else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
                 for (ExternalFile appendedFile : appendedFiles) {
                     if (appendedFile.getFileName().equals(file.getFileName())) {
-                        files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(),
-                                file.getFileNumber(), file.getFileName(), file.getLastModefiedTime(), appendedFile
-                                        .getSize(), ExternalFilePendingOp.PENDING_NO_OP));
+                        files.add(new ExternalFile(file.getDataverseName(), file.getDatasetName(), file.getFileNumber(),
+                                file.getFileName(), file.getLastModefiedTime(), appendedFile.getSize(),
+                                ExternalFilePendingOp.PENDING_NO_OP));
                     }
                 }
             }
@@ -496,9 +501,9 @@
         boolean temp = ds.getDatasetDetails().isTemp();
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = SplitsAndConstraintsProvider
                 .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
+                        getFilesIndexName(ds.getDatasetName()), temp, metadataProvider.getMetadataTxnContext());
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
                 mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
@@ -512,9 +517,9 @@
 
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = SplitsAndConstraintsProvider
                         .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                                index.getIndexName(), temp, metadataProvider.getMetadataTxnContext());
                 if (index.getIndexType() == IndexType.BTREE) {
                     btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
                             mergePolicyFactoryProperties, storageProperties, spec));
@@ -557,8 +562,7 @@
             AsterixStorageProperties storageProperties, JobSpecification spec) {
         return new ExternalBTreeWithBuddyDataflowHelperFactory(mergePolicyFactory, mergePolicyFactoryProperties,
                 new SecondaryIndexOperationTrackerProvider(ds.getDatasetId()),
-                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
                 storageProperties.getBloomFilterFalsePositiveRate(), new int[] { index.getKeyFieldNames().size() },
                 ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(ds), true);
     }
@@ -567,7 +571,7 @@
     private static ExternalRTreeDataflowHelperFactory getRTreeDataflowHelperFactory(Dataset ds, Index index,
             ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyFactoryProperties,
             AsterixStorageProperties storageProperties, AqlMetadataProvider metadataProvider, JobSpecification spec)
-            throws AlgebricksException, AsterixException {
+                    throws AlgebricksException, AsterixException {
         int numPrimaryKeys = getRIDSize(ds);
         List<List<String>> secondaryKeyFields = index.getKeyFieldNames();
         secondaryKeyFields.size();
@@ -594,8 +598,8 @@
                     .getSerializerDeserializer(nestedKeyType);
             secondaryRecFields[i] = keySerde;
 
-            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    nestedKeyType, true);
+            secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(nestedKeyType, true);
             secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
             valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
         }
@@ -629,9 +633,9 @@
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
 
         boolean temp = ds.getDatasetDetails().isTemp();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = SplitsAndConstraintsProvider
                 .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
+                        getFilesIndexName(ds.getDatasetName()), temp, metadataProvider.getMetadataTxnContext());
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
                 mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
@@ -645,9 +649,9 @@
 
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = SplitsAndConstraintsProvider
                         .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                                index.getIndexName(), temp, metadataProvider.getMetadataTxnContext());
                 if (index.getIndexType() == IndexType.BTREE) {
                     btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
                             mergePolicyFactoryProperties, storageProperties, spec));
@@ -687,9 +691,9 @@
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
         boolean temp = ds.getDatasetDetails().isTemp();
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = metadataProvider
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint = SplitsAndConstraintsProvider
                 .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                        getFilesIndexName(ds.getDatasetName()), temp);
+                        getFilesIndexName(ds.getDatasetName()), temp, metadataProvider.getMetadataTxnContext());
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         ExternalBTreeDataflowHelperFactory filesIndexDataflowHelperFactory = getFilesIndexDataflowHelperFactory(ds,
                 mergePolicyFactory, mergePolicyFactoryProperties, storageProperties, spec);
@@ -703,9 +707,9 @@
 
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
-                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = metadataProvider
+                Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint = SplitsAndConstraintsProvider
                         .splitProviderAndPartitionConstraintsForDataset(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                                index.getIndexName(), temp, metadataProvider.getMetadataTxnContext());
                 if (index.getIndexType() == IndexType.BTREE) {
                     btreeDataflowHelperFactories.add(getBTreeDataflowHelperFactory(ds, index, mergePolicyFactory,
                             mergePolicyFactoryProperties, storageProperties, spec));
@@ -742,14 +746,15 @@
                 metadataProvider.getMetadataTxnContext());
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
-                        dataset.getDatasetName(), getFilesIndexName(dataset.getDatasetName()), true);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = SplitsAndConstraintsProvider
+                .splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+                        getFilesIndexName(dataset.getDatasetName()), true, metadataProvider.getMetadataTxnContext());
         IFileSplitProvider secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
         ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
-                mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
-                        dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
+                mergePolicyFactory, mergePolicyFactoryProperties,
+                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                storageProperties.getBloomFilterFalsePositiveRate(),
                 ExternalDatasetsRegistry.INSTANCE.getDatasetVersion(dataset), true);
         FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
         LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index 4cb8a0f..35495bb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
@@ -56,7 +57,7 @@
 
     public static JobSpecification buildSecondaryIndexCreationJobSpec(CompiledCreateIndexStatement createIndexStmt,
             ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
-            throws AsterixException, AlgebricksException {
+                    throws AsterixException, AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
                         createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
@@ -68,7 +69,7 @@
 
     public static JobSpecification buildSecondaryIndexLoadingJobSpec(CompiledCreateIndexStatement createIndexStmt,
             ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider)
-            throws AsterixException, AlgebricksException {
+                    throws AsterixException, AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(createIndexStmt.getIndexType(), createIndexStmt.getDataverseName(),
                         createIndexStmt.getDatasetName(), createIndexStmt.getIndexName(),
@@ -100,8 +101,9 @@
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
         boolean temp = dataset.getDatasetDetails().isTemp();
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp,
+                        metadataProvider.getMetadataTxnContext());
         AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
@@ -109,13 +111,14 @@
         // The index drop operation should be persistent regardless of temp datasets or permanent dataset.
         IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
-                        dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
+                splitsAndConstraint.first,
+                new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+                        compactionInfo.first, compactionInfo.second,
                         new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
-        AlgebricksPartitionConstraintHelper
-                .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop,
+                splitsAndConstraint.second);
         spec.addRoot(btreeDrop);
 
         return spec;
@@ -123,7 +126,7 @@
 
     public static JobSpecification buildSecondaryIndexCompactJobSpec(CompiledIndexCompactStatement indexCompactStmt,
             ARecordType recType, ARecordType enforcedType, AqlMetadataProvider metadataProvider, Dataset dataset)
-            throws AsterixException, AlgebricksException {
+                    throws AsterixException, AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(indexCompactStmt.getIndexType(), indexCompactStmt.getDataverseName(),
                         indexCompactStmt.getDatasetName(), indexCompactStmt.getIndexName(),
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 4eb6944..6e61f27 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -48,6 +48,7 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
@@ -152,10 +153,10 @@
     }
 
     public static SecondaryIndexOperationsHelper createIndexOperationsHelper(IndexType indexType, String dataverseName,
-            String datasetName, String indexName, List<List<String>> secondaryKeyFields,
-            List<IAType> secondaryKeyTypes, boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
+            String datasetName, String indexName, List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes,
+            boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
             PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType enforcedType)
-            throws AsterixException, AlgebricksException {
+                    throws AsterixException, AlgebricksException {
         IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
         SecondaryIndexOperationsHelper indexOperationsHelper = null;
         switch (indexType) {
@@ -207,8 +208,9 @@
         enforcedItemType = enforcedType;
         payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
         numSecondaryKeys = secondaryKeyFields.size();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = SplitsAndConstraintsProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, secondaryIndexName, temp,
+                        metadataProvider.getMetadataTxnContext());
         secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
         secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
 
@@ -223,8 +225,9 @@
             }
 
             numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                    .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp,
+                            metadataProvider.getMetadataTxnContext());
             primaryFileSplitProvider = primarySplitsAndConstraint.first;
             primaryPartitionConstraint = primarySplitsAndConstraint.second;
             setPrimaryRecDescAndComparators();
@@ -286,8 +289,8 @@
                 throw new AlgebricksException(e);
             }
             primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
-            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    keyType, true);
+            primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+                    .getBinaryComparatorFactory(keyType, true);
             primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
             primaryBloomFilterKeyFields[i] = i;
         }
@@ -300,8 +303,8 @@
             List<List<String>> secondaryKeyFields, List<IAType> secondaryKeyTypes, int gramLength,
             AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException;
 
-    protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException,
-            AlgebricksException {
+    protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec)
+            throws AsterixException, AlgebricksException {
         // Build dummy tuple containing one field with a dummy value inside.
         ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
         DataOutput dos = tb.getDataOutput();
@@ -345,12 +348,12 @@
                 primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
                 primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        mergePolicyFactory, mergePolicyFactoryProperties, new PrimaryIndexOperationTrackerProvider(
-                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
-                                .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
-                        primaryBTreeFields, primaryFilterFields, !temp), false, false, null,
-                searchCallbackFactory, null, null);
+                        mergePolicyFactory, mergePolicyFactoryProperties,
+                        new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+                        primaryBTreeFields, primaryFilterFields, !temp),
+                false, false, null, searchCallbackFactory, null, null);
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
                 primaryPartitionConstraint);
@@ -433,7 +436,7 @@
 
     protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
             int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
-            throws MetadataException, AlgebricksException {
+                    throws MetadataException, AlgebricksException {
         int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys + numFilterFields];
         for (int i = 0; i < fieldPermutation.length; i++) {
             fieldPermutation[i] = i;
@@ -541,7 +544,7 @@
 
     protected ExternalIndexBulkModifyOperatorDescriptor createExternalIndexBulkModifyOp(JobSpecification spec,
             int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
-            throws MetadataException, AlgebricksException {
+                    throws MetadataException, AlgebricksException {
         int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
         for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
             fieldPermutation[i] = i;
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 147a356..8d03bfb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -46,11 +46,11 @@
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.AsterixFilesUtil;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -213,7 +213,7 @@
 
             PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
                     .getLocalResourceRepository();
-            localResourceRepository.initializeNewUniverse(metadataProperties.getStores().get(nodeId)[0]);
+            localResourceRepository.initializeNewUniverse(metadataProperties.getStorageDirectoryName());
         }
 
         IAsterixStateProxy proxy = null;
@@ -277,22 +277,19 @@
         performLocalCleanUp();
     }
 
-    private void performLocalCleanUp() throws IOException {
+    private void performLocalCleanUp() {
         //delete working area files from failed jobs
         runtimeContext.getIOManager().deleteWorkspaceFiles();
 
         //reclaim storage for temporary datasets.
-        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
-                .getLocalResourceRepository();
+        //get node stores
+        String[] nodeStores = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
+                .get(nodeId);
 
-        String[] storageMountingPoints = localResourceRepository.getStorageMountingPoints();
-        String storageFolderName = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
-                .get(nodeId)[0];
-
-        for (String mountPoint : storageMountingPoints) {
-            String tempDatasetFolder = mountPoint + storageFolderName + File.separator
-                    + AqlMetadataProvider.TEMP_DATASETS_STORAGE_FOLDER;
-            AsterixFilesUtil.deleteFolder(tempDatasetFolder);
+        for (String store : nodeStores) {
+            String tempDatasetFolder = store + File.separator
+                    + SplitsAndConstraintsProvider.TEMP_DATASETS_STORAGE_FOLDER;
+            FileUtils.deleteQuietly(new File(tempDatasetFolder));
         }
 
         // TODO
@@ -322,7 +319,8 @@
             for (Node node : nodes) {
                 String ncId = asterixInstanceName + "_" + node.getId();
                 if (ncId.equalsIgnoreCase(nodeId)) {
-                    String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
+                    //TODO get it from the properties
+                    String storeDir = cluster.getStore();
                     metadataProperties.getStores().put(nodeId, storeDir.split(","));
 
                     String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
diff --git a/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java b/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
index 7536c70..65a75dd 100644
--- a/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
+++ b/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
@@ -60,9 +61,9 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = SplitsAndConstraintsProvider
                 .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
-                        dataset.getDatasetDetails().isTemp());
+                        dataset.getDatasetDetails().isTemp(), metadataProvider.getMetadataTxnContext());
         AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index fa20099..e34ffbc 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -18,13 +18,14 @@
  !-->
 <asterixConfiguration xmlns="asterixconf">
 	<metadataNode>asterix_nc1</metadataNode>
+    <storageDirName>storage</storageDirName>
 	<store>
 		<ncId>asterix_nc1</ncId>
-		<storeDirs>asterix_nc1data</storeDirs>
+		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<store>
 		<ncId>asterix_nc2</ncId>
-		<storeDirs>asterix_nc2data</storeDirs>
+		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<transactionLogDir>
 		<ncId>asterix_nc1</ncId>
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 376b2ff..6c6e411 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -65,11 +65,6 @@
         if (files == null || files.length == 0) {
             outdir.delete();
         }
-
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
     }
 
     @Parameters
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 052e0be..7a55c90 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -68,10 +68,6 @@
     @AfterClass
     public static void tearDown() throws Exception {
         ExecutionTestUtil.tearDown();
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
     }
 
     @Parameters
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index d5f4db3..22a3ad7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -65,10 +65,7 @@
     @AfterClass
     public static void tearDown() throws Exception {
         ExecutionTestUtil.tearDown();
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
+        AsterixHyracksIntegrationUtil.removeTestStorageFiles();
     }
 
     @Parameters
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
new file mode 100644
index 0000000..6cd44a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ClusterPartition implements Cloneable {
+    private final int partitionId;
+    private final String nodeId;
+    private final int ioDeviceNum;
+    private String activeNodeId = null;
+    private boolean active = false;
+
+    public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
+        this.partitionId = partitionId;
+        this.nodeId = nodeId;
+        this.ioDeviceNum = ioDeviceNum;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getIODeviceNum() {
+        return ioDeviceNum;
+    }
+
+    public String getActiveNodeId() {
+        return activeNodeId;
+    }
+
+    public void setActiveNodeId(String activeNodeId) {
+        this.activeNodeId = activeNodeId;
+    }
+
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    @Override
+    public ClusterPartition clone() {
+        ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        return clone;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ID:" + partitionId);
+        sb.append(" Original Node: " + nodeId);
+        sb.append(" IODevice: " + ioDeviceNum);
+        sb.append(" Active Node: " + activeNodeId);
+        return sb.toString();
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index adaca46..ef9438a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -20,6 +20,9 @@
 
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
 
 public class AsterixMetadataProperties extends AbstractAsterixProperties {
 
@@ -35,8 +38,8 @@
         return accessor.getMetadataNodeName();
     }
 
-    public String getMetadataStore() {
-        return accessor.getMetadataStore();
+    public ClusterPartition getMetadataPartition() {
+        return accessor.getMetadataPartiton();
     }
 
     public Map<String, String[]> getStores() {
@@ -54,4 +57,16 @@
     public Map<String, String> getCoredumpPaths() {
         return accessor.getCoredumpConfig();
     }
+
+    public Map<String, ClusterPartition[]> getNodePartitions() {
+        return accessor.getNodePartitions();
+    }
+
+    public SortedMap<Integer, ClusterPartition> getClusterPartitions(){
+        return accessor.getClusterPartitions();
+    }
+    
+    public String getStorageDirectoryName() {
+        return accessor.getStorageDirName();
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index d6c81ab..ff73364 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -28,6 +28,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -35,6 +37,7 @@
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
 import org.apache.asterix.common.configuration.Property;
@@ -53,12 +56,16 @@
     private final Map<String, Property> asterixConfigurationParams;
     private final Map<String, String> transactionLogDirs;
     private final Map<String, String> asterixBuildProperties;
+    private final Map<String, ClusterPartition[]> nodePartitionsMap;
+    private final String storageDirName;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions;
 
     public AsterixPropertiesAccessor() throws AsterixException {
         String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
         if (fileName == null) {
             fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
         }
+
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(fileName);
         if (is == null) {
             try {
@@ -82,9 +89,20 @@
         stores = new HashMap<String, String[]>();
         List<Store> configuredStores = asterixConfiguration.getStore();
         nodeNames = new HashSet<String>();
+        nodePartitionsMap = new HashMap<>();
+        clusterPartitions = new TreeMap<>();
+        int uniquePartitionId = 0;
         for (Store store : configuredStores) {
             String trimmedStoreDirs = store.getStoreDirs().trim();
-            stores.put(store.getNcId(), trimmedStoreDirs.split(","));
+            String[] nodeStores = trimmedStoreDirs.split(",");
+            ClusterPartition[] nodePartitions = new ClusterPartition[nodeStores.length];
+            for (int i = 0; i < nodePartitions.length; i++) {
+                ClusterPartition partition = new ClusterPartition(uniquePartitionId++, store.getNcId(), i);
+                clusterPartitions.put(partition.getPartitionId(), partition);
+                nodePartitions[i] = partition;
+            }
+            stores.put(store.getNcId(), nodeStores);
+            nodePartitionsMap.put(store.getNcId(), nodePartitions);
             nodeNames.add(store.getNcId());
         }
         asterixConfigurationParams = new HashMap<String, Property>();
@@ -99,6 +117,7 @@
         for (TransactionLogDir txnLogDir : asterixConfiguration.getTransactionLogDir()) {
             transactionLogDirs.put(txnLogDir.getNcId(), txnLogDir.getTxnLogDirPath());
         }
+        storageDirName = asterixConfiguration.getStorageDirName();
         Properties gitProperties = new Properties();
         try {
             gitProperties.load(getClass().getClassLoader().getResourceAsStream("git.properties"));
@@ -114,10 +133,6 @@
 
     public String getMetadataNodeName() {
         return metadataNodeName;
-    }
-
-    public String getMetadataStore() {
-        return stores.get(metadataNodeName)[0];
     }
 
     public Map<String, String[]> getStores() {
@@ -172,7 +187,7 @@
         }
     }
 
-    private <T> void logConfigurationError(Property p, T defaultValue) {
+    private static <T> void logConfigurationError(Property p, T defaultValue) {
         if (LOGGER.isLoggable(Level.SEVERE)) {
             LOGGER.severe("Invalid property value '" + p.getValue() + "' for property '" + p.getName()
                     + "'.\n See the description: \n" + p.getDescription() + "\nDefault = " + defaultValue);
@@ -182,4 +197,25 @@
     public String getInstanceName() {
         return instanceName;
     }
+
+    public void updateNodeStores(String node, String[] newStores) {
+        stores.put(node, newStores);
+    }
+
+    public ClusterPartition getMetadataPartiton() {
+        //metadata partition is always the first partition on the metadata node
+        return nodePartitionsMap.get(metadataNodeName)[0];
+    }
+
+    public Map<String, ClusterPartition[]> getNodePartitions() {
+        return nodePartitionsMap;
+    }
+
+    public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+        return clusterPartitions;
+    }
+    
+    public String getStorageDirName() {
+        return storageDirName;
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index adf1152..5062d06 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -78,9 +78,9 @@
     }
 
     @Override
-    public synchronized IIndex getIndex(String resourceName) throws HyracksDataException {
-        int datasetID = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+        int datasetID = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
         return getIndex(datasetID, resourceID);
     }
 
@@ -98,9 +98,9 @@
     }
 
     @Override
-    public synchronized void register(String resourceName, IIndex index) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
             dsInfo = getDatasetInfo(did);
@@ -116,16 +116,16 @@
         dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
     }
 
-    public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
-        LocalResource lr = resourceRepository.getResourceByName(resourceName);
+    public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
         if (lr == null) {
             return -1;
         }
         return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
     }
 
-    public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
-        LocalResource lr = resourceRepository.getResourceByName(resourceName);
+    public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
         if (lr == null) {
             return -1;
         }
@@ -133,9 +133,9 @@
     }
 
     @Override
-    public synchronized void unregister(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void unregister(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         IndexInfo iInfo = dsInfo.indexes.get(resourceID);
@@ -180,9 +180,9 @@
     }
 
     @Override
-    public synchronized void open(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void open(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null || !dsInfo.isRegistered) {
@@ -262,9 +262,9 @@
     }
 
     @Override
-    public synchronized void close(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void close(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
@@ -704,9 +704,9 @@
     }
 
     @Override
-    public synchronized void allocateMemory(String resourceName) throws HyracksDataException {
+    public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
         //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
-        int did = Integer.parseInt(resourceName);
+        int did = Integer.parseInt(resourcePath);
         allocateDatasetMemory(did);
     }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 51168ae..fd1ebb8 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -63,7 +63,7 @@
         try {
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
+                    indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index bb99319..e417521 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -56,6 +56,9 @@
     <xs:element
         name="txnLogDirPath"
         type="xs:string" />
+    <xs:element
+        name="storageDirName"
+        type="xs:string" />
 
     <!-- definition of complex elements -->
     <xs:element name="store">
@@ -109,6 +112,9 @@
                     ref="mg:metadataNode"
                     minOccurs="0" />
                 <xs:element
+                    ref="mg:storageDirName"
+                    minOccurs="0" />
+                <xs:element
                     ref="mg:store"
                     maxOccurs="unbounded" />
                 <xs:element
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index d3203d5..872c959 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -113,7 +113,6 @@
 				<xs:element ref="cl:java_home" minOccurs="0" />
 				<xs:element ref="cl:log_dir" minOccurs="0" />
 				<xs:element ref="cl:txn_log_dir" minOccurs="0" />
-				<xs:element ref="cl:store" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:debug_port" minOccurs="0" />
 			</xs:sequence>
diff --git a/asterix-common/src/main/resources/schema/yarn_cluster.xsd b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
index f54cf90..8827985 100644
--- a/asterix-common/src/main/resources/schema/yarn_cluster.xsd
+++ b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
@@ -138,9 +138,6 @@
                     ref="cl:txn_log_dir"
                     minOccurs="0" />
                 <xs:element
-                    ref="cl:store"
-                    minOccurs="0" />
-                <xs:element
                     ref="cl:iodevices"
                     minOccurs="0" />
                 <xs:element
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index d04a3dd..d8147b6 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -71,7 +71,7 @@
         this.port = 19002;
     }
 
-    public TestExecutor(String host, int port){
+    public TestExecutor(String host, int port) {
         this.host = host;
         this.port = port;
     }
@@ -225,12 +225,16 @@
             // In future this may be changed depending on the requested
             // output format sent to the servlet.
             String errorBody = method.getResponseBodyAsString();
-            JSONObject result = new JSONObject(errorBody);
-            String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
-                    result.getString("stacktrace") };
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
-            throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
-                    + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            try {
+                JSONObject result = new JSONObject(errorBody);
+                String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+                        result.getString("stacktrace") };
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+                throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+                        + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            } catch (Exception e) {
+                throw new Exception(errorBody);
+            }
         }
         return statusCode;
     }
@@ -303,7 +307,7 @@
     }
 
     private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
-        final String url = "http://"+host+":"+port+"/query/result";
+        final String url = "http://" + host + ":" + port + "/query/result";
 
         // Create a method instance.
         GetMethod method = new GetMethod(url);
@@ -430,9 +434,9 @@
                     switch (ctx.getType()) {
                         case "ddl":
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
                             } else {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl/sqlpp");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl/sqlpp");
                             }
                             break;
                         case "update":
@@ -442,9 +446,9 @@
                                         "127.0.0.1://../../../../../../asterix-app/");
                             }
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
                             } else {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update/sqlpp");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update/sqlpp");
                             }
                             break;
                         case "query":
@@ -461,25 +465,25 @@
                             OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
                             if (ctx.getFile().getName().endsWith("aql")) {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                    resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query",
-                                            cUnit.getParameter());
+                                    resultStream = executeQuery(statement, fmt,
+                                            "http://" + host + ":" + port + "/query", cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://"+host+":"+port+"/aql");
+                                            "http://" + host + ":" + port + "/aql");
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://"+host+":"+port+"/aql");
+                                            "http://" + host + ":" + port + "/aql");
                                 }
                             } else {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                    resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query/sqlpp",
-                                            cUnit.getParameter());
+                                    resultStream = executeQuery(statement, fmt,
+                                            "http://" + host + ":" + port + "/query/sqlpp", cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://"+host+":"+port+"/sqlpp");
+                                            "http://" + host + ":" + port + "/sqlpp");
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://"+host+":"+port+"/sqlpp");
+                                            "http://" + host + ":" + port + "/sqlpp");
                                 }
                             }
 
@@ -506,7 +510,7 @@
                             break;
                         case "txnqbc": //qbc represents query before crash
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://"+host+":"+port+"/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
                             qbcFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qbc.adm");
@@ -515,7 +519,7 @@
                             break;
                         case "txnqar": //qar represents query after recovery
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://"+host+":"+port+"/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
                             qarFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qar.adm");
@@ -528,7 +532,7 @@
                             break;
                         case "txneu": //eu represents erroneous update
                             try {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
                             } catch (Exception e) {
                                 //An exception is expected.
                                 failed = true;
@@ -556,7 +560,7 @@
                             break;
                         case "errddl": // a ddlquery that expects error
                             try {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
                             } catch (Exception e) {
                                 // expected error happens
                                 failed = true;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index c92262c..29765fd 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
 public class EventDriver {
 
     public static final String CLIENT_NODE_ID = "client_node";
-    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
+    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
 
     private static String eventsDir;
     private static Events events;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index d6e7da0..b83faa2 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -191,7 +191,7 @@
             String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
                     .getMasterNode().getJavaHome();
             return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
-                    null, null, null, cluster.getMasterNode().getDebugPort());
+                    null, null, cluster.getMasterNode().getDebugPort());
         }
 
         List<Node> nodeList = cluster.getNode();
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 33ba787..ffcdf5e 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -45,7 +45,6 @@
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
 import org.apache.asterix.common.configuration.Store;
@@ -59,6 +58,7 @@
 import org.apache.asterix.event.schema.cluster.Env;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.event.schema.cluster.Property;
+import org.apache.commons.io.IOUtils;
 
 public class AsterixEventServiceUtil {
 
@@ -90,8 +90,8 @@
         return instance;
     }
 
-    public static void createAsterixZip(AsterixInstance asterixInstance) throws IOException, InterruptedException,
-            JAXBException, EventException {
+    public static void createAsterixZip(AsterixInstance asterixInstance)
+            throws IOException, InterruptedException, JAXBException, EventException {
         String asterixInstanceDir = asterixInstanceDir(asterixInstance);
         unzip(AsterixEventService.getAsterixZip(), asterixInstanceDir);
 
@@ -128,18 +128,18 @@
 
         clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, ccJavaOpts));
         clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, ncJavaOpts));
-        clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
-                + "asterix"));
+        clusterProperties
+                .add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator + "asterix"));
         clusterProperties.add(new Property("LOG_DIR", cluster.getLogDir()));
         clusterProperties.add(new Property("JAVA_HOME", cluster.getJavaHome()));
         clusterProperties.add(new Property("WORKING_DIR", cluster.getWorkingDir().getDir()));
         clusterProperties.add(new Property("CLIENT_NET_IP", cluster.getMasterNode().getClientIp()));
         clusterProperties.add(new Property("CLUSTER_NET_IP", cluster.getMasterNode().getClusterIp()));
 
-        int clusterNetPort = cluster.getMasterNode().getClusterPort() != null ? cluster.getMasterNode()
-                .getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
-        int clientNetPort = cluster.getMasterNode().getClientPort() != null ? cluster.getMasterNode().getClientPort()
-                .intValue() : CLIENT_NET_PORT_DEFAULT;
+        int clusterNetPort = cluster.getMasterNode().getClusterPort() != null
+                ? cluster.getMasterNode().getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
+        int clientNetPort = cluster.getMasterNode().getClientPort() != null
+                ? cluster.getMasterNode().getClientPort().intValue() : CLIENT_NET_PORT_DEFAULT;
         int httpPort = cluster.getMasterNode().getHttpPort() != null ? cluster.getMasterNode().getHttpPort().intValue()
                 : HTTP_PORT_DEFAULT;
 
@@ -151,8 +151,8 @@
     }
 
     private static String asterixZipName() {
-        return AsterixEventService.getAsterixZip().substring(
-                AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+        return AsterixEventService.getAsterixZip()
+                .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
     }
 
     private static String asterixJarPath(AsterixInstance asterixInstance, String asterixInstanceDir) {
@@ -174,8 +174,8 @@
         new File(asterixInstanceDir + File.separator + ASTERIX_CONFIGURATION_FILE).delete();
     }
 
-    private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir, AsterixInstance asterixInstance)
-            throws IOException, EventException, JAXBException {
+    private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir,
+            AsterixInstance asterixInstance) throws IOException, EventException, JAXBException {
         File sourceJar = new File(asterixJarPath(asterixInstance, asterixInstanceDir));
         writeAsterixClusterConfigurationFile(asterixInstance);
 
@@ -185,8 +185,8 @@
         new File(asterixInstanceDir + File.separator + CLUSTER_CONFIGURATION_FILE).delete();
     }
 
-    private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance) throws IOException,
-            EventException, JAXBException {
+    private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance)
+            throws IOException, EventException, JAXBException {
         String asterixInstanceName = asterixInstance.getName();
         Cluster cluster = asterixInstance.getCluster();
 
@@ -197,8 +197,8 @@
                 + asterixInstanceName + File.separator + "cluster.xml"));
     }
 
-    public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName,
-            String libraryName, String libraryPath) throws IOException {
+    public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName, String libraryName,
+            String libraryPath) throws IOException {
         File instanceDir = new File(asterixInstanceDir(asterixInstance));
         if (!instanceDir.exists()) {
             instanceDir.mkdirs();
@@ -235,30 +235,8 @@
         return metadataNode;
     }
 
-    public static String getNodeDirectories(String asterixInstanceName, Node node, Cluster cluster) {
-        String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
-        String[] storeDirs = null;
-        StringBuffer nodeDataStore = new StringBuffer();
-        String storeDirValue = node.getStore();
-        if (storeDirValue == null) {
-            storeDirValue = cluster.getStore();
-            if (storeDirValue == null) {
-                throw new IllegalStateException(" Store not defined for node " + node.getId());
-            }
-            storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
-        }
-
-        storeDirs = storeDirValue.split(",");
-        for (String ns : storeDirs) {
-            nodeDataStore.append(ns + File.separator + storeDataSubDir.trim());
-            nodeDataStore.append(",");
-        }
-        nodeDataStore.deleteCharAt(nodeDataStore.length() - 1);
-        return nodeDataStore.toString();
-    }
-
-    private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance) throws IOException,
-            JAXBException {
+    private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance)
+            throws IOException, JAXBException {
         String asterixInstanceName = asterixInstance.getName();
         Cluster cluster = asterixInstance.getCluster();
         String metadataNodeId = asterixInstance.getMetadataNodeId();
@@ -266,29 +244,35 @@
         AsterixConfiguration configuration = asterixInstance.getAsterixConfiguration();
         configuration.setInstanceName(asterixInstanceName);
         configuration.setMetadataNode(asterixInstanceName + "_" + metadataNodeId);
-        String storeDir = null;
         List<Store> stores = new ArrayList<Store>();
+        String storeDir = cluster.getStore().trim();
         for (Node node : cluster.getNode()) {
-            storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-            stores.add(new Store(asterixInstanceName + "_" + node.getId(), storeDir));
+            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+            String[] nodeIdDevice = iodevices.split(",");
+            StringBuilder nodeStores = new StringBuilder();
+            for (int i = 0; i < nodeIdDevice.length; i++) {
+                nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+            }
+            //remove last comma
+            nodeStores.deleteCharAt(nodeStores.length() - 1);
+            stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeStores.toString()));
         }
         configuration.setStore(stores);
-
+        configuration.setStorageDirName(storeDir);
         List<Coredump> coredump = new ArrayList<Coredump>();
         String coredumpDir = null;
         List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
         String txnLogDir = null;
         for (Node node : cluster.getNode()) {
             coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
-            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir + File.separator
-                    + asterixInstanceName + "_" + node.getId()));
+            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(),
+                    coredumpDir + File.separator + asterixInstanceName + "_" + node.getId()));
 
             txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
             txnLogDirs.add(new TransactionLogDir(asterixInstanceName + "_" + node.getId(), txnLogDir));
         }
         configuration.setCoredump(coredump);
         configuration.setTransactionLogDir(txnLogDirs);
-
         File asterixConfDir = new File(AsterixEventService.getAsterixDir() + File.separator + asterixInstanceName);
         asterixConfDir.mkdirs();
 
@@ -299,8 +283,6 @@
         marshaller.marshal(configuration, os);
         os.close();
     }
-
-
 
     public static void unzip(String sourceFile, String destDir) throws IOException {
         BufferedOutputStream dest = null;
@@ -432,8 +414,8 @@
             }
         }
         if (!valid) {
-            throw new EventException("Asterix instance by the name " + name + " is in " + instance.getState()
-                    + " state ");
+            throw new EventException(
+                    "Asterix instance by the name " + name + " is in " + instance.getState() + " state ");
         }
         return instance;
     }
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b6aaddb..6085019 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -164,11 +164,11 @@
         String store;
         String pargs;
         String iodevices;
+        store = cluster.getStore();
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
-            store = node.getStore() == null ? cluster.getStore() : node.getStore();
             pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
@@ -188,12 +188,12 @@
         String txnLogDir;
         String store;
         String pargs;
+        store = cluster.getStore();
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
             txnLogDir = node.getTxnLogDir() == null ? instance.getCluster().getTxnLogDir() : node.getTxnLogDir();
-            store = node.getStore() == null ? cluster.getStore() : node.getStore();
             pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + txnLogDir + " " + backupId + " " + backupDir
                     + " " + "local" + " " + node.getId();
@@ -212,14 +212,12 @@
         VerificationUtil.verifyBackupRestoreConfiguration(hdfsUrl, hadoopVersion, hdfsBackupDir);
         String workingDir = cluster.getWorkingDir().getDir();
         int backupId = backupInfo.getId();
-        String nodeStore;
         String pargs;
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeStore = node.getStore() == null ? clusterStore : node.getStore();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
                     + hadoopVersion;
@@ -235,14 +233,12 @@
         String backupDir = backupInfo.getBackupConf().getBackupDir();
         String workingDir = cluster.getWorkingDir().getDir();
         int backupId = backupInfo.getId();
-        String nodeStore;
         String pargs;
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeStore = node.getStore() == null ? clusterStore : node.getStore();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + backupDir + " " + "local" + " " + node.getId();
             Event event = new Event("restore", nodeid, pargs);
@@ -262,8 +258,8 @@
 
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
-        String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp()
-                + " " + workingDir;
+        String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp() + " "
+                + workingDir;
         Event event = new Event("directory_transfer", nodeid, pargs);
         Pattern p = new Pattern(null, 1, null, event);
         addInitialDelay(p, 2, "sec");
@@ -428,8 +424,8 @@
                 patternList.add(p);
             }
 
-            pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir
-                    + " " + "unpack";
+            pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir + " "
+                    + "unpack";
             event = new Event("file_transfer", nodeid, pargs);
             p = new Pattern(null, 1, null, event);
             patternList.add(p);
@@ -529,8 +525,8 @@
             String[] nodeIODevices;
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
             nodeIODevices = iodevices.trim().split(",");
+            String nodeStore = cluster.getStore().trim();
             for (String nodeIODevice : nodeIODevices) {
-                String nodeStore = node.getStore() == null ? cluster.getStore() : node.getStore();
                 pargs = nodeIODevice.trim() + File.separator + nodeStore;
                 Event event = new Event("file_delete", nodeid, pargs);
                 patternList.add(new Pattern(null, 1, null, event));
@@ -540,13 +536,15 @@
         return patterns;
     }
 
-    private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp, String destDir) {
+    private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp,
+            String destDir) {
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
-        String asterixZipName = AsterixEventService.getAsterixZip().substring(
-                AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
-        String fileToTransfer = new File(AsterixEventService.getAsterixDir() + File.separator + instanceName
-                + File.separator + asterixZipName).getAbsolutePath();
+        String asterixZipName = AsterixEventService.getAsterixZip()
+                .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+        String fileToTransfer = new File(
+                AsterixEventService.getAsterixDir() + File.separator + instanceName + File.separator + asterixZipName)
+                        .getAbsolutePath();
         String pargs = username + " " + fileToTransfer + " " + destinationIp + " " + destDir + " " + "unpack";
         Event event = new Event("file_transfer", nodeid, pargs);
         return new Pattern(null, 1, null, event);
@@ -607,8 +605,8 @@
             ps.add(p);
 
             nodeid = new Nodeid(new Value(null, nodeToBeAdded.getId()));
-            pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp()
-                    + " " + workingDir;
+            pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp() + " "
+                    + workingDir;
             event = new Event("directory_transfer", nodeid, pargs);
             p = new Pattern(null, 1, null, event);
             addInitialDelay(p, 2, "sec");
@@ -626,8 +624,8 @@
         String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
         String srcHost = cluster.getMasterNode().getClientIp();
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
-        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
-                .getLogDir();
+        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
+                : cluster.getMasterNode().getLogDir();
         String destDir = outputDir + File.separator + "cc";
         String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
         Event event = new Event("directory_copy", nodeid, pargs);
@@ -649,7 +647,7 @@
         Patterns patterns = new Patterns(patternList);
         return patterns;
     }
-    
+
     private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
 
         List<Pattern> patternList = new ArrayList<Pattern>();
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ebf41cc..c4a96f4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -34,7 +34,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import org.apache.hadoop.fs.BlockLocation;
@@ -211,12 +210,10 @@
         Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         for (String i : stores.keySet()) {
             String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
             for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                    locs.add(i);
-                }
+                //two readers per partition
+                locs.add(i);
+                locs.add(i);
             }
         }
         String[] cluster = new String[locs.size()];
@@ -273,67 +270,67 @@
      * @throws IOException
      */
     protected InputSplit[] getSplits(JobConf conf) throws IOException {
-        // Create file system object
-        FileSystem fs = FileSystem.get(conf);
         ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
         ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
-        // Create files splits
-        for (ExternalFile file : files) {
-            Path filePath = new Path(file.getFileName());
-            FileStatus fileStatus;
-            try {
-                fileStatus = fs.getFileStatus(filePath);
-            } catch (FileNotFoundException e) {
-                // file was deleted at some point, skip to next file
-                continue;
-            }
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
-                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                // Get its information from HDFS name node
-                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
-                // Create a split per block
-                for (BlockLocation block : fileBlocks) {
-                    if (block.getOffset() < file.getSize()) {
-                        fileSplits
-                                .add(new FileSplit(filePath,
-                                        block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
-                                                ? block.getLength() : (file.getSize() - block.getOffset()),
-                                block.getHosts()));
-                        orderedExternalFiles.add(file);
-                    }
+        // Create file system object
+        try (FileSystem fs = FileSystem.get(conf)) {
+            // Create files splits
+            for (ExternalFile file : files) {
+                Path filePath = new Path(file.getFileName());
+                FileStatus fileStatus;
+                try {
+                    fileStatus = fs.getFileStatus(filePath);
+                } catch (FileNotFoundException e) {
+                    // file was deleted at some point, skip to next file
+                    continue;
                 }
-            } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
-                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                long oldSize = 0L;
-                long newSize = file.getSize();
-                for (int i = 0; i < files.size(); i++) {
-                    if (files.get(i).getFileName() == file.getFileName() && files.get(i).getSize() != file.getSize()) {
-                        newSize = files.get(i).getSize();
-                        oldSize = file.getSize();
-                        break;
-                    }
-                }
-
-                // Get its information from HDFS name node
-                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
-                // Create a split per block
-                for (BlockLocation block : fileBlocks) {
-                    if (block.getOffset() + block.getLength() > oldSize) {
-                        if (block.getOffset() < newSize) {
-                            // Block interact with delta -> Create a split
-                            long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
-                            long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
-                                    : block.getOffset() + block.getLength() - newSize;
-                            long splitLength = block.getLength() - startCut - endCut;
-                            fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+                if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
+                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                    // Get its information from HDFS name node
+                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
+                    // Create a split per block
+                    for (BlockLocation block : fileBlocks) {
+                        if (block.getOffset() < file.getSize()) {
+                            fileSplits.add(new FileSplit(filePath,
+                                    block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+                                            ? block.getLength() : (file.getSize() - block.getOffset()),
                                     block.getHosts()));
                             orderedExternalFiles.add(file);
+                        }
+                    }
+                } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
+                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                    long oldSize = 0L;
+                    long newSize = file.getSize();
+                    for (int i = 0; i < files.size(); i++) {
+                        if (files.get(i).getFileName() == file.getFileName()
+                                && files.get(i).getSize() != file.getSize()) {
+                            newSize = files.get(i).getSize();
+                            oldSize = file.getSize();
+                            break;
+                        }
+                    }
+
+                    // Get its information from HDFS name node
+                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
+                    // Create a split per block
+                    for (BlockLocation block : fileBlocks) {
+                        if (block.getOffset() + block.getLength() > oldSize) {
+                            if (block.getOffset() < newSize) {
+                                // Block interact with delta -> Create a split
+                                long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
+                                long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+                                        : block.getOffset() + block.getLength() - newSize;
+                                long splitLength = block.getLength() - startCut - endCut;
+                                fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+                                        block.getHosts()));
+                                orderedExternalFiles.add(file);
+                            }
                         }
                     }
                 }
             }
         }
-        fs.close();
         files = orderedExternalFiles;
         return fileSplits.toArray(new FileSplit[fileSplits.size()]);
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 11e2b96..f0a5f8f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -186,11 +186,8 @@
         Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         for (String i : stores.keySet()) {
             String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
             for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
+                locs.add(i);
             }
         }
         String[] cluster = new String[locs.size()];
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
index bce4620..6ff991b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.indexing.operators;
 
-import java.io.File;
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -49,9 +48,7 @@
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
         AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
         fileManager.deleteTransactionFiles();
     }
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
index 0eacc15..e89a8db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesCommitOperatorDescriptor.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -49,18 +50,15 @@
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        System.err.println("performing the operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getIODeviceId()));
+        FileReference resourecePath = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
+        System.err.println("performing the operation on "+ resourecePath.getFile().getAbsolutePath());
         // Get DataflowHelper
         IIndexDataflowHelper indexHelper = indexDataflowHelperFactory.createIndexDataflowHelper(fileIndexInfo, ctx, partition);
         // Get index
         IIndex index = indexHelper.getIndexInstance();
         // commit transaction
         ((ITwoPCIndex) index).commitTransaction();
-        System.err.println("operation on "+ IndexFileNameUtil.prepareFileName(fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo.getFileSplitProvider()
-                .getFileSplits()[partition].getIODeviceId()) + " Succeded");
+        System.err.println("operation on "+ resourecePath.getFile().getAbsolutePath() + " Succeded");
 
     }
 
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 8e7a288..9bdfaa6 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -49,9 +49,7 @@
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
         AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
         fileManager.recoverTransaction();
     }
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 2254f6f..09c65c8 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -27,8 +27,6 @@
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
 
-import org.kohsuke.args4j.Option;
-
 import org.apache.asterix.event.management.EventUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.MasterNode;
@@ -37,6 +35,7 @@
 import org.apache.asterix.installer.driver.InstallerDriver;
 import org.apache.asterix.installer.schema.conf.Configuration;
 import org.apache.asterix.installer.schema.conf.Zookeeper;
+import org.kohsuke.args4j.Option;
 
 public class ValidateCommand extends AbstractCommand {
 
@@ -97,7 +96,7 @@
             valid = false;
         } else {
             cluster = EventUtil.getCluster(clusterPath);
-            validateClusterProperties(cluster);
+            valid = valid & validateClusterProperties(cluster);
 
             Set<String> servers = new HashSet<String>();
             Set<String> serverIds = new HashSet<String>();
@@ -106,7 +105,7 @@
 
             MasterNode masterNode = cluster.getMasterNode();
             Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
-                    masterNode.getLogDir(), null, null, null, null);
+                    masterNode.getLogDir(), null, null, null);
             ipAddresses.add(masterNode.getClusterIp());
 
             valid = valid & validateNodeConfiguration(master, cluster);
@@ -158,7 +157,7 @@
         return true;
     }
 
-    private void validateClusterProperties(Cluster cluster) {
+    private boolean validateClusterProperties(Cluster cluster) {
         List<String> tempDirs = new ArrayList<String>();
         if (cluster.getLogDir() != null && checkTemporaryPath(cluster.getLogDir())) {
             tempDirs.add("Log directory: " + cluster.getLogDir());
@@ -176,6 +175,11 @@
             LOGGER.warn(msg);
         }
 
+        if (cluster.getStore() == null || cluster.getStore().length() == 0) {
+            LOGGER.fatal("store not defined at cluster" + ERROR);
+            return false;
+        }
+        return true;
     }
 
     private boolean validateNodeConfiguration(Node node, Cluster cluster) {
@@ -198,14 +202,6 @@
             if (cluster.getTxnLogDir() == null || cluster.getTxnLogDir().length() == 0) {
                 valid = false;
                 LOGGER.fatal("txn_log_dir not defined at cluster/node level for node: " + node.getId() + ERROR);
-            }
-        }
-
-        if (node.getStore() == null || node.getStore().length() == 0) {
-            if (!cluster.getMasterNode().getId().equals(node.getId())
-                    && (cluster.getStore() == null || cluster.getStore().length() == 0)) {
-                valid = false;
-                LOGGER.fatal("store not defined at cluster/node level for node: " + node.getId() + ERROR);
             }
         }
 
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
index 56da6ee..1ac60ba 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/driver/InstallerUtil.java
@@ -48,14 +48,11 @@
         String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
         String[] storeDirs = null;
         StringBuffer nodeDataStore = new StringBuffer();
-        String storeDirValue = node.getStore();
+        String storeDirValue = cluster.getStore();
         if (storeDirValue == null) {
-            storeDirValue = cluster.getStore();
-            if (storeDirValue == null) {
-                throw new IllegalStateException(" Store not defined for node " + node.getId());
-            }
-            storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
+            throw new IllegalStateException(" Store not defined for node " + node.getId());
         }
+        storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
 
         storeDirs = storeDirValue.split(",");
         for (String ns : storeDirs) {
@@ -66,8 +63,8 @@
         return nodeDataStore.toString();
     }
 
-    public static AsterixConfiguration getAsterixConfiguration(String asterixConf) throws FileNotFoundException,
-            IOException, JAXBException {
+    public static AsterixConfiguration getAsterixConfiguration(String asterixConf)
+            throws FileNotFoundException, IOException, JAXBException {
         if (asterixConf == null) {
             asterixConf = InstallerDriver.getManagixHome() + File.separator + DEFAULT_ASTERIX_CONFIGURATION_PATH;
         }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index f22b2f1..51d9bcf 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -62,6 +63,7 @@
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.AdapterIdentifier;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
@@ -108,7 +110,6 @@
     private static IIOManager ioManager;
 
     private static String metadataNodeName;
-    private static String metadataStore;
     private static Set<String> nodeNames;
     private static String outputDir;
 
@@ -147,9 +148,7 @@
 
         AsterixMetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
         metadataNodeName = metadataProperties.getMetadataNodeName();
-        metadataStore = metadataProperties.getMetadataStore();
         nodeNames = metadataProperties.getNodeNames();
-        // nodeStores = asterixProperity.getStores();
 
         dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
         localResourceRepository = runtimeContext.getLocalResourceRepository();
@@ -375,11 +374,14 @@
 
     private static void enlistMetadataDataset(IMetadataIndex index, boolean create, MetadataTransactionContext mdTxnCtx)
             throws Exception {
-        String filePath = ioManager.getIODevices().get(runtimeContext.getMetaDataIODeviceId()).getPath()
-                + File.separator
-                + IndexFileNameUtil.prepareFileName(metadataStore + File.separator + index.getFileNameRelativePath(),
-                        runtimeContext.getMetaDataIODeviceId());
-        FileReference file = new FileReference(new File(filePath));
+        ClusterPartition metadataPartition = propertiesProvider.getMetadataProperties().getMetadataPartition();
+        int metadataDeviceId = metadataPartition.getIODeviceNum();
+        String metadataPartitionPath = SplitsAndConstraintsProvider.prepareStoragePartitionPath(
+                propertiesProvider.getMetadataProperties().getStorageDirectoryName(),
+                metadataPartition.getPartitionId());
+        String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath(); 
+        FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
+        
         List<IVirtualBufferCache> virtualBufferCaches = runtimeContext
                 .getVirtualBufferCaches(index.getDatasetId().getId());
         ITypeTraits[] typeTraits = index.getTypeTraits();
@@ -391,7 +393,7 @@
                 ? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
                 : new BaseOperationTracker(index.getDatasetId().getId(),
                         dataLifecycleManager.getDatasetInfo(index.getDatasetId().getId()));
-        final String path = file.getFile().getPath();
+        final String absolutePath = file.getFile().getPath();
         if (create) {
             lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
@@ -409,12 +411,13 @@
             ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                     localResourceMetadata, LocalResource.LSMBTreeResource);
             ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
-            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
-            dataLifecycleManager.register(path, lsmBtree);
+            localResourceRepository.insert(
+                    localResourceFactory.createLocalResource(resourceID, resourceName, metadataPartition.getPartitionId(), absolutePath));
+            dataLifecycleManager.register(absolutePath, lsmBtree);
         } else {
-            final LocalResource resource = localResourceRepository.getResourceByName(path);
+            final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
             resourceID = resource.getResourceId();
-            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
+            lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
                         typeTraits, comparatorFactories, bloomFilterKeyFields,
@@ -424,7 +427,7 @@
                         opTracker, runtimeContext.getLSMIOScheduler(),
                         LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
                         null, null, null, null, true);
-                dataLifecycleManager.register(path, lsmBtree);
+                dataLifecycleManager.register(absolutePath, lsmBtree);
             }
         }
 
@@ -529,4 +532,4 @@
             MetadataManager.INSTANCE.releaseWriteLatch();
         }
     }
-}
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
deleted file mode 100644
index 92ab90d..0000000
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlCompiledMetadataDeclarations.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata.declared;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.annotations.TypeDataGen;
-import org.apache.asterix.common.config.AsterixMetadataProperties;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.formats.base.IDataFormat;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.api.IMetadataManager;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.data.IAWriterFactory;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-
-public class AqlCompiledMetadataDeclarations {
-    private static Logger LOGGER = Logger.getLogger(AqlCompiledMetadataDeclarations.class.getName());
-
-    // We are assuming that there is a one AqlCompiledMetadataDeclarations per
-    // transaction.
-    private final MetadataTransactionContext mdTxnCtx;
-    private String dataverseName = null;
-    private FileSplit outputFile;
-    private Map<String, String[]> stores;
-    private IDataFormat format;
-    private Map<String, String> config;
-
-    private final Map<String, IAType> types;
-    private final Map<String, TypeDataGen> typeDataGenMap;
-    private final IAWriterFactory writerFactory;
-
-    private IMetadataManager metadataManager = MetadataManager.INSTANCE;
-    private boolean isConnected = false;
-
-    public AqlCompiledMetadataDeclarations(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            FileSplit outputFile, Map<String, String> config, Map<String, String[]> stores, Map<String, IAType> types,
-            Map<String, TypeDataGen> typeDataGenMap, IAWriterFactory writerFactory, boolean online) {
-        this.mdTxnCtx = mdTxnCtx;
-        this.dataverseName = dataverseName;
-        this.outputFile = outputFile;
-        this.config = config;
-        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
-        if (stores == null && online) {
-            this.stores = metadataProperties.getStores();
-        } else {
-            this.stores = stores;
-        }
-        this.types = types;
-        this.typeDataGenMap = typeDataGenMap;
-        this.writerFactory = writerFactory;
-    }
-
-    public void connectToDataverse(String dvName) throws AlgebricksException, AsterixException {
-        if (isConnected) {
-            throw new AlgebricksException("You are already connected to " + dataverseName + " dataverse");
-        }
-        Dataverse dv;
-        try {
-            dv = metadataManager.getDataverse(mdTxnCtx, dvName);
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-        if (dv == null) {
-            throw new AlgebricksException("There is no dataverse with this name " + dvName + " to connect to.");
-        }
-        dataverseName = dvName;
-        isConnected = true;
-        try {
-            format = (IDataFormat) Class.forName(dv.getDataFormat()).newInstance();
-        } catch (Exception e) {
-            throw new AsterixException(e);
-        }
-    }
-
-    public void disconnectFromDataverse() throws AlgebricksException {
-        if (!isConnected) {
-            throw new AlgebricksException("You are not connected to any dataverse");
-        }
-        dataverseName = null;
-        format = null;
-        isConnected = false;
-    }
-
-    public boolean isConnectedToDataverse() {
-        return isConnected;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public FileSplit getOutputFile() {
-        return outputFile;
-    }
-
-    public IDataFormat getFormat() throws AlgebricksException {
-        if (!isConnected) {
-            throw new AlgebricksException("You need first to connect to a dataverse.");
-        }
-        return format;
-    }
-
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
-    }
-
-    public IAType findType(String typeName) {
-        Datatype type;
-        try {
-            type = metadataManager.getDatatype(mdTxnCtx, dataverseName, typeName);
-        } catch (Exception e) {
-            throw new IllegalStateException();
-        }
-        if (type == null) {
-            throw new IllegalStateException();
-        }
-        return type.getDatatype();
-    }
-
-    public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException {
-        NodeGroup ng;
-        try {
-            ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-        if (ng == null) {
-            throw new AlgebricksException("No node group with this name " + nodeGroupName);
-        }
-        return ng.getNodeNames();
-    }
-
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
-    public Dataset findDataset(String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
-        try {
-            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
-        try {
-            return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-        } catch (MetadataException e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    public void setOutputFile(FileSplit outputFile) {
-        this.outputFile = outputFile;
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-            String datasetName, String targetIdxName) throws AlgebricksException {
-        FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
-        String[] loc = new String[splits.length];
-        for (int p = 0; p < splits.length; p++) {
-            loc[p] = splits[p].getNodeName();
-        }
-        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
-        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
-    }
-
-    private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
-            throws AlgebricksException {
-
-        File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
-        Dataset dataset = findDataset(datasetName);
-        if (dataset.getDatasetType() != DatasetType.INTERNAL) {
-            throw new AlgebricksException("Not an internal dataset");
-        }
-        List<String> nodeGroup = findNodeGroupNodeNames(dataset.getNodeGroupName());
-        if (nodeGroup == null) {
-            throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-        }
-
-        List<FileSplit> splitArray = new ArrayList<FileSplit>();
-        for (String nd : nodeGroup) {
-            String[] nodeStores = stores.get(nd);
-            if (nodeStores == null) {
-                LOGGER.warning("Node " + nd + " has no stores.");
-                throw new AlgebricksException("Node " + nd + " has no stores.");
-            } else {
-                for (int j = 0; j < nodeStores.length; j++) {
-                    File f = new File(nodeStores[j] + File.separator + relPathFile);
-                    splitArray.add(new FileSplit(nd, new FileReference(f)));
-                }
-            }
-        }
-        FileSplit[] splits = new FileSplit[splitArray.size()];
-        int i = 0;
-        for (FileSplit fs : splitArray) {
-            splits[i++] = fs;
-        }
-        return splits;
-    }
-
-    public String getRelativePath(String fileName) {
-        return dataverseName + File.separator + fileName;
-    }
-
-    public Map<String, TypeDataGen> getTypeDataGenMap() {
-        return typeDataGenMap;
-    }
-
-    public Map<String, IAType> getTypeDeclarations() {
-        return types;
-    }
-
-    public IAWriterFactory getWriterFactory() {
-        return writerFactory;
-    }
-
-    public MetadataTransactionContext getMetadataTransactionContext() {
-        return mdTxnCtx;
-    }
-}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
index ad06737..02d151d 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlLogicalPlanAndMetadataImpl.java
@@ -19,10 +19,7 @@
 
 package org.apache.asterix.metadata.declared;
 
-import java.util.ArrayList;
-import java.util.Map;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
@@ -49,16 +46,6 @@
 
     @Override
     public AlgebricksPartitionConstraint getClusterLocations() {
-        Map<String, String[]> stores = metadataProvider.getAllStores();
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String k : stores.keySet()) {
-            String[] nodeStores = stores.get(k);
-            for (int j = 0; j < nodeStores.length; j++) {
-                locs.add(k);
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
+        return AsterixClusterProperties.INSTANCE.getClusterLocations();
     }
 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index d61d323..9238df0 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -91,6 +91,7 @@
 import org.apache.asterix.metadata.feeds.FeedUtil;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsProvider;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -148,7 +149,6 @@
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
@@ -160,16 +160,13 @@
 import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
 import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
@@ -185,11 +182,9 @@
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 
 public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
-    public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
     private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
     private MetadataTransactionContext mdTxnCtx;
     private boolean isWriteTransaction;
-    private final Map<String, String[]> stores;
     private Map<String, String> config;
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
@@ -197,15 +192,17 @@
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     private final ICentralFeedManager centralFeedManager;
-
+    private final Map<String, String[]> stores;
     private final Dataverse defaultDataverse;
     private JobId jobId;
     private Map<String, Integer> locks;
     private boolean isTemporaryDatasetWriteJob = true;
-
     private final AsterixStorageProperties storageProperties;
-
     public static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+
+    public Map<String, String[]> getAllStores() {
+        return stores;
+    }
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -215,18 +212,14 @@
         this.config = config;
     }
 
-    public Map<String, String[]> getAllStores() {
-        return stores;
-    }
-
     public Map<String, String> getConfig() {
         return config;
     }
 
     public AqlMetadataProvider(Dataverse defaultDataverse, ICentralFeedManager centralFeedManager) {
         this.defaultDataverse = defaultDataverse;
-        this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.centralFeedManager = centralFeedManager;
     }
 
@@ -741,8 +734,8 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             try {
-                spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
-                        dataset.getDatasetName(), indexName, temp);
+                spPc = SplitsAndConstraintsProvider.splitProviderAndPartitionConstraintsForDataset(
+                        dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp, mdTxnCtx);
             } catch (Exception e) {
                 throw new AlgebricksException(e);
             }
@@ -896,8 +889,9 @@
             ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
                     numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = splitProviderAndPartitionConstraintsForDataset(
-                    dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
+                            dataset.getDatasetName(), indexName, temp, mdTxnCtx);
 
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
                     dataset, recType, context.getBinaryComparatorFactoryProvider());
@@ -1112,8 +1106,9 @@
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
 
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
-                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+                            indexName, temp, mdTxnCtx);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1196,8 +1191,9 @@
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
                     itemType, context.getBinaryComparatorFactoryProvider());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
-                    dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataSource.getId().getDataverseName(), datasetName,
+                            indexName, temp, mdTxnCtx);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1514,8 +1510,9 @@
             IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
                     secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
 
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
+                            dataset.getDatasetDetails().isTemp(), mdTxnCtx);
 
             // Generate Output Record format
             ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -1684,8 +1681,9 @@
             }
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp,
+                            mdTxnCtx);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1888,8 +1886,9 @@
             }
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp,
+                            mdTxnCtx);
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2020,8 +2019,9 @@
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
                     dataset, recType, context.getBinaryComparatorFactoryProvider());
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
-                    dataverseName, datasetName, indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = SplitsAndConstraintsProvider
+                    .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp,
+                            mdTxnCtx);
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
@@ -2095,9 +2095,9 @@
         return jobId;
     }
 
-    public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
-        return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
-    }
+    //    public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
+    //        return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
+    //    }
 
     public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
             throws AlgebricksException {
@@ -2128,7 +2128,7 @@
         List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
                 .getNodeNames();
         for (String nd : nodeGroup) {
-            numPartitions += AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
+            numPartitions += AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
         }
         return numElementsHint /= numPartitions;
     }
@@ -2136,93 +2136,6 @@
     @Override
     public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
         return AsterixBuiltinFunctions.lookupFunction(fid);
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
-            String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
-        return splitProviderAndPartitionConstraints(splits);
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
-            String dataverse) {
-        FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
-        return splitProviderAndPartitionConstraints(splits);
-    }
-
-    private Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
-            FileSplit[] splits) {
-        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
-        String[] loc = new String[splits.length];
-        for (int p = 0; p < splits.length; p++) {
-            loc[p] = splits[p].getNodeName();
-        }
-        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
-        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
-    }
-
-    private FileSplit[] splitsForDataverse(MetadataTransactionContext mdTxnCtx, String dataverseName) {
-        File relPathFile = new File(dataverseName);
-        List<FileSplit> splits = new ArrayList<FileSplit>();
-        for (Map.Entry<String, String[]> entry : stores.entrySet()) {
-            String node = entry.getKey();
-            String[] nodeStores = entry.getValue();
-            if (nodeStores == null) {
-                continue;
-            }
-            for (int i = 0; i < nodeStores.length; i++) {
-                int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
-                String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(node);
-                for (int j = 0; j < nodeStores.length; j++) {
-                    for (int k = 0; k < numIODevices; k++) {
-                        File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                        splits.add(new FileSplit(node, new FileReference(f), k));
-                    }
-                }
-            }
-        }
-        return splits.toArray(new FileSplit[] {});
-    }
-
-    public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
-            String targetIdxName, boolean temp) throws AlgebricksException {
-        try {
-            File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                    .getNodeNames();
-            if (nodeGroup == null) {
-                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-            }
-
-            List<FileSplit> splitArray = new ArrayList<FileSplit>();
-            for (String nd : nodeGroup) {
-                String[] nodeStores = stores.get(nd);
-                if (nodeStores == null) {
-                    LOGGER.warning("Node " + nd + " has no stores.");
-                    throw new AlgebricksException("Node " + nd + " has no stores.");
-                } else {
-                    int numIODevices;
-                    if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
-                        numIODevices = 1;
-                    } else {
-                        numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
-                    }
-                    String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
-                    for (int j = 0; j < nodeStores.length; j++) {
-                        for (int k = 0; k < numIODevices; k++) {
-                            File f = new File(ioDevices[k] + File.separator + nodeStores[j]
-                                    + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
-                                    + relPathFile);
-                            splitArray.add(new FileSplit(nd, new FileReference(f), k));
-                        }
-                    }
-                }
-            }
-            return splitArray.toArray(new FileSplit[0]);
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
     }
 
     private static Map<String, String> initializeAdapterFactoryMapping() {
@@ -2254,10 +2167,6 @@
             adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
         }
         return adapter;
-    }
-
-    private static String getRelativePath(String dataverseName, String fileName) {
-        return dataverseName + File.separator + fileName;
     }
 
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
@@ -2306,23 +2215,7 @@
         }
     }
 
-    public AlgebricksPartitionConstraint getClusterLocations() {
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
-            }
-        }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        return new AlgebricksAbsolutePartitionConstraint(cluster);
-    }
-
-    public IDataFormat getFormat() {
+    public static IDataFormat getFormat() {
         return FormatUtils.getDefaultFormat();
     }
 
@@ -2334,7 +2227,7 @@
      * @return a new map containing the original dataset properties and the
      *         scheduler/locations
      */
-    private Map<String, Object> wrapProperties(Map<String, String> properties) {
+    private static Map<String, Object> wrapProperties(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
         // wrappedProperties.put(SCHEDULER, hdfsScheduler);
@@ -2349,66 +2242,10 @@
      *            the original properties
      * @return the new stirng-object map
      */
-    private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+    private static Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
         return wrappedProperties;
-    }
-
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
-            String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-        FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
-        return splitProviderAndPartitionConstraints(splits);
-    }
-
-    private FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-
-        try {
-            File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
-                    .getNodeNames();
-            if (nodeGroup == null) {
-                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
-            }
-
-            List<FileSplit> splitArray = new ArrayList<FileSplit>();
-            for (String nd : nodeGroup) {
-                String[] nodeStores = stores.get(nd);
-                if (nodeStores == null) {
-                    LOGGER.warning("Node " + nd + " has no stores.");
-                    throw new AlgebricksException("Node " + nd + " has no stores.");
-                } else {
-                    // Only the first partition when create
-                    String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
-                    if (create) {
-                        for (int j = 0; j < nodeStores.length; j++) {
-                            File f = new File(
-                                    ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                            splitArray.add(new FileSplit(nd, new FileReference(f), 0));
-                        }
-                    } else {
-                        int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
-                        for (int j = 0; j < nodeStores.length; j++) {
-                            for (int k = 0; k < numIODevices; k++) {
-                                File f = new File(
-                                        ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
-                                splitArray.add(new FileSplit(nd, new FileReference(f), 0));
-                            }
-                        }
-                    }
-                }
-            }
-            FileSplit[] splits = new FileSplit[splitArray.size()];
-            int i = 0;
-            for (FileSplit fs : splitArray) {
-                splits[i++] = fs;
-            }
-            return splits;
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
     }
 
     public AsterixStorageProperties getStorageProperties() {
@@ -2467,15 +2304,11 @@
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
         try {
-            spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName().
-
-            concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
-        } catch (
-
-        Exception e)
-
-        {
+            spPc = SplitsAndConstraintsProvider.splitProviderAndPartitionConstraintsForFilesIndex(
+                    dataset.getDataverseName(), dataset.getDatasetName(),
+                    dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false,
+                    metadataProvider.getMetadataTxnContext());
+        } catch (Exception e) {
             throw new AlgebricksException(e);
         }
 
@@ -2491,4 +2324,4 @@
         // Return value
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
     }
-}
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsProvider.java
new file mode 100644
index 0000000..d03d5bb
--- /dev/null
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsProvider.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.metadata.MetadataException;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitsAndConstraintsProvider {
+
+    public static final String PARTITION_DIR_PREFIX = "partition_";
+    public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
+    public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+
+    private static FileSplit[] splitsForDataverse(String dataverseName) {
+        File relPathFile = new File(dataverseName);
+        List<FileSplit> splits = new ArrayList<FileSplit>();
+        //get all partitions
+        ClusterPartition[] clusterPartition = AsterixClusterProperties.INSTANCE.getClusterPartitons();
+        String storageDirName = AsterixAppContextInfo.getInstance().getMetadataProperties().getStorageDirectoryName();
+        for (int j = 0; j < clusterPartition.length; j++) {
+            int nodeParitions = AsterixClusterProperties.INSTANCE
+                    .getNodePartitionsCount(clusterPartition[j].getNodeId());
+            for (int i = 0; i < nodeParitions; i++) {
+                File f = new File(prepareStoragePartitionPath(storageDirName, clusterPartition[i].getPartitionId())
+                        + File.separator + relPathFile);
+                splits.add(getFileSplitForClusterPartition(clusterPartition[j], f));
+            }
+        }
+        return splits.toArray(new FileSplit[] {});
+    }
+
+    public static FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+        try {
+            File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                    .getNodeNames();
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+            }
+
+            String storageDirName = AsterixAppContextInfo.getInstance().getMetadataProperties()
+                    .getStorageDirectoryName();
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            for (String nd : nodeGroup) {
+                int numPartitions = AsterixClusterProperties.INSTANCE.getNodePartitionsCount(nd);
+                ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nd);
+                //currently this case is never executed since the metadata group doesn't exists
+                if (dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME) == 0) {
+                    numPartitions = 1;
+                }
+
+                for (int k = 0; k < numPartitions; k++) {
+                    //format: 'node storage folder' / 'partition_#'/ dataverse / idx  
+                    File f = new File(prepareStoragePartitionPath(storageDirName, nodePartitions[k].getPartitionId())
+                            + (temp ? (File.separator + TEMP_DATASETS_STORAGE_FOLDER) : "") + File.separator
+                            + relPathFile);
+                    splits.add(getFileSplitForClusterPartition(nodePartitions[k], f));
+                }
+            }
+            return splits.toArray(new FileSplit[] {});
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    private static FileSplit[] splitsForFilesIndex(MetadataTransactionContext mdTxnCtx, String dataverseName,
+            String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+        try {
+            File relPathFile = new File(prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
+            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                    .getNodeNames();
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName());
+            }
+
+            List<FileSplit> splits = new ArrayList<FileSplit>();
+            for (String nodeId : nodeGroup) {
+                //get node partitions
+                ClusterPartition[] nodePartitions = AsterixClusterProperties.INSTANCE.getNodePartitions(nodeId);
+                String storageDirName = AsterixAppContextInfo.getInstance().getMetadataProperties()
+                        .getStorageDirectoryName();
+                int firstPartition = 0;
+                if (create) {
+                    // Only the first partition when create
+                    File f = new File(
+                            prepareStoragePartitionPath(storageDirName, nodePartitions[firstPartition].getPartitionId())
+                                    + File.separator + relPathFile);
+                    splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+                } else {
+                    for (int k = 0; k < nodePartitions.length; k++) {
+                        File f = new File(prepareStoragePartitionPath(storageDirName,
+                                nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
+                        splits.add(getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+                    }
+                }
+            }
+            return splits.toArray(new FileSplit[] {});
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+            String dataverse) {
+        FileSplit[] splits = splitsForDataverse(dataverse);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
+            String dataverseName, String datasetName, String targetIdxName, boolean temp,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+        FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
+            String dataverseName, String datasetName, String targetIdxName, boolean create,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+        FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
+        return splitProviderAndPartitionConstraints(splits);
+    }
+
+    private static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+            FileSplit[] splits) {
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        String[] loc = new String[splits.length];
+        for (int p = 0; p < splits.length; p++) {
+            loc[p] = splits[p].getNodeName();
+        }
+        AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
+        return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
+    }
+
+    private static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, File relativeFile) {
+        return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile), partition.getIODeviceNum(),
+                partition.getPartitionId());
+    }
+
+    public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
+        return storageDirName + File.separator + PARTITION_DIR_PREFIX + partitonId;
+    }
+
+    private static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
+        return dataverseName + File.separator + datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+    }
+}
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index f2482da..8973cce 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -33,6 +34,7 @@
 import javax.xml.bind.Unmarshaller;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -59,6 +61,9 @@
 
     private boolean globalRecoveryCompleted = false;
 
+    private Map<String, ClusterPartition[]> node2PartitionsMap = null;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
         if (is != null) {
@@ -66,37 +71,73 @@
                 JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
                 Unmarshaller unmarshaller = ctx.createUnmarshaller();
                 cluster = (Cluster) unmarshaller.unmarshal(is);
-
             } catch (JAXBException e) {
                 throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
             }
         } else {
             cluster = null;
         }
+        //if this is the CC process
+        if (AsterixAppContextInfo.getInstance() != null) {
+            if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
+                node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
+                clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+            }
+        }
     }
 
     private ClusterState state = ClusterState.UNUSABLE;
 
     public synchronized void removeNCConfiguration(String nodeId) {
+        updateNodePartition(nodeId, false);
         ncConfiguration.remove(nodeId);
-        if (ncConfiguration.keySet().size() != AsterixAppContextInfo.getInstance().getMetadataProperties()
-                .getNodeNames().size()) {
-            state = ClusterState.UNUSABLE;
-            LOGGER.info("Cluster now is in UNSABLE state");
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" Removing configuration parameters for node id " + nodeId);
         }
-        resetClusterPartitionConstraint();
+        //TODO implement fault tolerance as follows:
+        //1. collect the partitions of the failed NC
+        //2. For each partition, request a remote replica to take over. 
+        //3. wait until each remote replica completes the recovery for the lost partitions
+        //4. update the cluster state
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
         ncConfiguration.put(nodeId, configuration);
-        if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
-                .getNodeNames().size()) {
-            state = ClusterState.ACTIVE;
-        }
+        updateNodePartition(nodeId, true);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" Registering configuration parameters for node id " + nodeId);
         }
-        resetClusterPartitionConstraint();
+    }
+
+    private synchronized void updateNodePartition(String nodeId, boolean added) {
+        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
+        //if this isn't a storage node, it will not have cluster partitions
+        if (nodePartitions != null) {
+            for (ClusterPartition p : nodePartitions) {
+                //set the active node for this node's partitions
+                p.setActive(added);
+                if (added) {
+                    p.setActiveNodeId(nodeId);
+                } else {
+                    p.setActiveNodeId(null);
+                }
+            }
+            resetClusterPartitionConstraint();
+            updateClusterState();
+        }
+    }
+
+    private synchronized void updateClusterState() {
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (!p.isActive()) {
+                state = ClusterState.UNUSABLE;
+                LOGGER.info("Cluster is in UNSABLE state");
+                return;
+            }
+        }
+        //if all storage partitions are active, then the cluster is active
+        state = ClusterState.ACTIVE;
+        LOGGER.info("Cluster is now ACTIVE");
     }
 
     /**
@@ -162,20 +203,14 @@
     }
 
     private synchronized void resetClusterPartitionConstraint() {
-        Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-        ArrayList<String> locs = new ArrayList<String>();
-        for (String i : stores.keySet()) {
-            String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
-            for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
+        ArrayList<String> clusterActiveLocations = new ArrayList<>();
+        for (ClusterPartition p : clusterPartitions.values()) {
+            if (p.isActive()) {
+                clusterActiveLocations.add(p.getActiveNodeId());
             }
         }
-        String[] cluster = new String[locs.size()];
-        cluster = locs.toArray(cluster);
-        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
     }
 
     public boolean isGlobalRecoveryCompleted() {
@@ -194,7 +229,32 @@
         return AsterixClusterProperties.INSTANCE.getState() == ClusterState.ACTIVE;
     }
 
-    public static int getNumberOfNodes(){
+    public static int getNumberOfNodes() {
         return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
     }
+
+    public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
+        return node2PartitionsMap.get(nodeId);
+    }
+
+    public synchronized int getNodePartitionsCount(String node) {
+        if (node2PartitionsMap.containsKey(node)) {
+            return node2PartitionsMap.get(node).length;
+        }
+        return 0;
+    }
+
+    public synchronized ClusterPartition[] getClusterPartitons() {
+        ArrayList<ClusterPartition> partitons = new ArrayList<>();
+        for (ClusterPartition cluster : clusterPartitions.values()) {
+            partitons.add(cluster);
+        }
+        return partitons.toArray(new ClusterPartition[] {});
+    }
+
+    private void printClusterPartitions() {
+        for (ClusterPartition p : clusterPartitions.values()) {
+            System.out.println(p);
+        }
+    }
 }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 58bc44e..794f867 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,13 +50,13 @@
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 48ebff3..06a1957 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -47,12 +47,12 @@
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 23eb2be..f2a6820 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 6e0394a..8d838a3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -48,12 +48,12 @@
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+    public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
             Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getDatasetLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
index 0566367..15224e2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceFactory.java
@@ -33,7 +33,7 @@
     }
 
     @Override
-    public LocalResource createLocalResource(long resourceId, String resourceName, int partition) {
-        return new LocalResource(resourceId, resourceName, partition, resourceType, localResourceMetadata);
+    public LocalResource createLocalResource(long resourceId, String resourceName, int partition, String resourcePath) {
+        return new LocalResource(resourceId, resourceName, partition, resourcePath, resourceType, localResourceMetadata);
     }
 }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 8ae3eb1..0c7b062 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -64,8 +64,7 @@
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
 
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId)
-            throws HyracksDataException {
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
         mountPoints = new String[devices.size()];
         this.nodeId = nodeId;
         for (int i = 0; i < mountPoints.length; i++) {
@@ -123,7 +122,8 @@
             }
 
             LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
-                    storageMetadataFile.getAbsolutePath(), 0, 0, storageRootDirPath);
+                    storageMetadataFile.getAbsolutePath(), 0, storageMetadataFile.getAbsolutePath(), 0,
+                    storageRootDirPath);
             insert(rootLocalResource);
             LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
         }
@@ -131,13 +131,13 @@
     }
 
     @Override
-    public LocalResource getResourceByName(String name) throws HyracksDataException {
-        LocalResource resource = resourceCache.getIfPresent(name);
+    public LocalResource getResourceByPath(String path) throws HyracksDataException {
+        LocalResource resource = resourceCache.getIfPresent(path);
         if (resource == null) {
-            File resourceFile = getLocalResourceFileByName(name);
+            File resourceFile = getLocalResourceFileByName(path);
             if (resourceFile.exists()) {
                 resource = readLocalResource(resourceFile);
-                resourceCache.put(name, resource);
+                resourceCache.put(path, resource);
             }
         }
         return resource;
@@ -145,13 +145,15 @@
 
     @Override
     public synchronized void insert(LocalResource resource) throws HyracksDataException {
-        File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
+        File resourceFile = new File(getFileName(resource.getResourcePath(), resource.getResourceId()));
         if (resourceFile.exists()) {
             throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+        } else {
+            resourceFile.getParentFile().mkdirs();
         }
 
         if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-            resourceCache.put(resource.getResourceName(), resource);
+            resourceCache.put(resource.getResourcePath(), resource);
         }
 
         FileOutputStream fos = null;
@@ -182,18 +184,18 @@
 
             //if replication enabled, send resource metadata info to remote nodes
             if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-                String filePath = getFileName(resource.getResourceName(), resource.getResourceId());
+                String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
                 createReplicationJob(ReplicationOperation.REPLICATE, filePath);
             }
         }
     }
 
     @Override
-    public synchronized void deleteResourceByName(String name) throws HyracksDataException {
-        File resourceFile = getLocalResourceFileByName(name);
+    public synchronized void deleteResourceByPath(String resourcePath) throws HyracksDataException {
+        File resourceFile = getLocalResourceFileByName(resourcePath);
         if (resourceFile.exists()) {
             resourceFile.delete();
-            resourceCache.invalidate(name);
+            resourceCache.invalidate(resourcePath);
 
             //if replication enabled, delete resource from remote replicas
             if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
@@ -204,8 +206,8 @@
         }
     }
 
-    private static File getLocalResourceFileByName(String resourceName) {
-        return new File(resourceName + File.separator + METADATA_FILE_NAME);
+    private static File getLocalResourceFileByName(String resourcePath) {
+        return new File(resourcePath + File.separator + METADATA_FILE_NAME);
     }
 
     public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
@@ -220,25 +222,21 @@
             }
 
             //load all local resources.
-            File[] dataverseFileList = storageRootDir.listFiles();
-            if (dataverseFileList != null) {
-                for (File dataverseFile : dataverseFileList) {
-                    if (dataverseFile.isDirectory()) {
-                        File[] indexFileList = dataverseFile.listFiles();
-                        if (indexFileList != null) {
-                            for (File indexFile : indexFileList) {
-                                if (indexFile.isDirectory()) {
-                                    File[] ioDevicesList = indexFile.listFiles();
-                                    if (ioDevicesList != null) {
-                                        for (File ioDeviceFile : ioDevicesList) {
-                                            if (ioDeviceFile.isDirectory()) {
-                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
-                                                if (metadataFiles != null) {
-                                                    for (File metadataFile : metadataFiles) {
-                                                        LocalResource localResource = readLocalResource(metadataFile);
-                                                        resourcesMap.put(localResource.getResourceId(), localResource);
-                                                    }
-                                                }
+            File[] partitions = storageRootDir.listFiles();
+            for (File partition : partitions) {
+                File[] dataverseFileList = partition.listFiles();
+                if (dataverseFileList != null) {
+                    for (File dataverseFile : dataverseFileList) {
+                        if (dataverseFile.isDirectory()) {
+                            File[] indexFileList = dataverseFile.listFiles();
+                            if (indexFileList != null) {
+                                for (File indexFile : indexFileList) {
+                                    if (indexFile.isDirectory()) {
+                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+                                        if (metadataFiles != null) {
+                                            for (File metadataFile : metadataFiles) {
+                                                LocalResource localResource = readLocalResource(metadataFile);
+                                                resourcesMap.put(localResource.getResourceId(), localResource);
                                             }
                                         }
                                     }
@@ -263,27 +261,23 @@
                 continue;
             }
 
-            //traverse all local resources.
-            File[] dataverseFileList = storageRootDir.listFiles();
-            if (dataverseFileList != null) {
-                for (File dataverseFile : dataverseFileList) {
-                    if (dataverseFile.isDirectory()) {
-                        File[] indexFileList = dataverseFile.listFiles();
-                        if (indexFileList != null) {
-                            for (File indexFile : indexFileList) {
-                                if (indexFile.isDirectory()) {
-                                    File[] ioDevicesList = indexFile.listFiles();
-                                    if (ioDevicesList != null) {
-                                        for (File ioDeviceFile : ioDevicesList) {
-                                            if (ioDeviceFile.isDirectory()) {
-                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
-                                                if (metadataFiles != null) {
-                                                    for (File metadataFile : metadataFiles) {
-                                                        LocalResource localResource = readLocalResource(metadataFile);
-                                                        maxResourceId = Math.max(maxResourceId,
-                                                                localResource.getResourceId());
-                                                    }
-                                                }
+            //load all local resources.
+            File[] partitions = storageRootDir.listFiles();
+            for (File partition : partitions) {
+                //traverse all local resources.
+                File[] dataverseFileList = partition.listFiles();
+                if (dataverseFileList != null) {
+                    for (File dataverseFile : dataverseFileList) {
+                        if (dataverseFile.isDirectory()) {
+                            File[] indexFileList = dataverseFile.listFiles();
+                            if (indexFileList != null) {
+                                for (File indexFile : indexFileList) {
+                                    if (indexFile.isDirectory()) {
+                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+                                        if (metadataFiles != null) {
+                                            for (File metadataFile : metadataFiles) {
+                                                LocalResource localResource = readLocalResource(metadataFile);
+                                                maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
                                             }
                                         }
                                     }
@@ -305,8 +299,7 @@
             if (!baseDir.endsWith(System.getProperty("file.separator"))) {
                 baseDir += System.getProperty("file.separator");
             }
-            String fileName = new String(baseDir + METADATA_FILE_NAME);
-            return fileName;
+            return new String(baseDir + METADATA_FILE_NAME);
         }
     }
 
@@ -376,6 +369,7 @@
 
     /**
      * Deletes physical files of all data verses.
+     * 
      * @param deleteStorageMetadata
      * @throws IOException
      */
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index f602156..701e529 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -44,11 +44,13 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -70,11 +72,14 @@
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -295,10 +300,10 @@
             //get datasetLifeCycleManager
             IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                     .getDatasetLifecycleManager();
+            IIOManager ioManager = appRuntimeContext.getIOManager();
             ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
             Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
                     .loadAndGetAllResources();
-
             //set log reader to the lowWaterMarkLsn again.
             logReader.initializeScan(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -346,14 +351,15 @@
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
+                            String resourceAbsolutePath = localResource.getResourcePath();
+                            index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                             if (index == null) {
                                 //#. create index instance and register to indexLifeCycleManager
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                        localResource.getResourceName(), localResource.getPartition());
-                                datasetLifecycleManager.register(localResource.getResourceName(), index);
-                                datasetLifecycleManager.open(localResource.getResourceName());
+                                        resourceAbsolutePath, localResource.getPartition());
+                                datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                datasetLifecycleManager.open(resourceAbsolutePath);
 
                                 //#. get maxDiskLastLSN
                                 ILSMIndex lsmIndex = index;
@@ -387,7 +393,7 @@
             //close all indexes
             Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
             for (long r : resourceIdList) {
-                datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
+                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
             }
 
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -501,6 +507,9 @@
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+        IIOManager ioManager = appRuntimeContext.getIOManager();
+        SortedMap<Integer, ClusterPartition> clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext
+                .getAppContext()).getMetadataProperties().getClusterPartitions();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
         ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
         Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
@@ -552,12 +561,23 @@
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
+                            //get the resource path relative to this node
+                            int resourcePartition = localResource.getPartition();
+                            //get partition io device id
+                            //NOTE:
+                            //currently we store all partition in the same IO device in all nodes. If this changes,
+                            //this needs to be updated to find the IO device in which the partition is stored in this local node.
+                            int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum();
+                            String resourceAbsolutePath = ioManager
+                                    .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile()
+                                    .getAbsolutePath();
+                            localResource.setResourcePath(resourceAbsolutePath);
+                            index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                             if (index == null) {
                                 //#. create index instance and register to indexLifeCycleManager
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
-                                        localResource.getResourceName(), localResource.getPartition());
+                                        resourceAbsolutePath, localResource.getPartition());
                                 datasetLifecycleManager.register(localResource.getResourceName(), index);
                                 datasetLifecycleManager.open(localResource.getResourceName());
 
@@ -595,7 +615,7 @@
         //close all indexes
         Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
         for (long r : resourceIdList) {
-            datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
+            datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
         }
 
         if (LOGGER.isLoggable(Level.INFO)) {
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
index e84135e..eeaa00a 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
@@ -1041,14 +1041,14 @@
                     if (iodevice == null) {
                         iodevice = clusterDesc.getIodevices();
                     }
-                    String storageSuffix = local.getStore() == null ? clusterDesc.getStore() : local.getStore();
-                    String storagePath = iodevice + File.separator + storageSuffix;
+//                    String storageSuffix = clusterDesc.getStore();
+//                    String storagePath = iodevice + File.separator + storageSuffix;
                     vargs.add(ncJavaOpts);
                     vargs.add(NC_CLASSNAME);
                     vargs.add("-app-nc-main-class org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint");
                     vargs.add("-node-id " + local.getId());
                     vargs.add("-cc-host " + cC.getClusterIp());
-                    vargs.add("-iodevices " + storagePath);
+                    vargs.add("-iodevices " + iodevice);
                     vargs.add("-cluster-net-ip-address " + local.getClusterIp());
                     vargs.add("-data-ip-address " + local.getClusterIp());
                     vargs.add("-result-ip-address " + local.getClusterIp());
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
index dc61506..774070a 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
@@ -38,6 +38,12 @@
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.configuration.Coredump;
+import org.apache.asterix.common.configuration.Store;
+import org.apache.asterix.common.configuration.TransactionLogDir;
+import org.apache.asterix.event.schema.yarnCluster.Cluster;
+import org.apache.asterix.event.schema.yarnCluster.Node;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -76,13 +82,6 @@
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.asterix.common.configuration.AsterixConfiguration;
-import org.apache.asterix.common.configuration.Coredump;
-import org.apache.asterix.common.configuration.Store;
-import org.apache.asterix.common.configuration.TransactionLogDir;
-import org.apache.asterix.event.schema.yarnCluster.Cluster;
-import org.apache.asterix.event.schema.yarnCluster.Node;
-
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public class AsterixYARNClient {
@@ -113,13 +112,13 @@
         }
     }
 
-    public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap
-            .<String, AsterixYARNClient.Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL)
-            .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL)
-            .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER)
-            .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE)
-            .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP)
-            .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build();
+    public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap.<String, AsterixYARNClient
+            .Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL).put(Mode.START.alias, Mode.START)
+            .put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL).put(Mode.DESTROY.alias, Mode.DESTROY)
+            .put(Mode.ALTER.alias, Mode.ALTER).put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL)
+            .put(Mode.DESCRIBE.alias, Mode.DESCRIBE).put(Mode.BACKUP.alias, Mode.BACKUP)
+            .put(Mode.LSBACKUP.alias, Mode.LSBACKUP).put(Mode.RMBACKUP.alias, Mode.RMBACKUP)
+            .put(Mode.RESTORE.alias, Mode.RESTORE).build();
     private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class);
     public static final String CONF_DIR_REL = ".asterix" + File.separator;
     private static final String instanceLock = "instance";
@@ -223,8 +222,8 @@
                 }
                 break;
             case KILL:
-                if (client.isRunning() &&
-                    Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
+                if (client.isRunning() && Utils.confirmAction(
+                        "Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
                     try {
                         AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient);
                     } catch (ApplicationNotFoundException e) {
@@ -232,8 +231,7 @@
                         System.out.println("Asterix instance by that name already exited or was never started");
                         client.deleteLockFile();
                     }
-                }
-                else if(!client.isRunning()){
+                } else if (!client.isRunning()) {
                     System.out.println("Asterix instance by that name already exited or was never started");
                     client.deleteLockFile();
                 }
@@ -254,8 +252,8 @@
                 break;
             case DESTROY:
                 try {
-                    if (client.force
-                            || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
+                    if (client.force || Utils.confirmAction(
+                            "Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
                         app = client.makeApplicationContext();
                         res = client.deployConfig();
                         res.addAll(client.distributeBinaries());
@@ -289,7 +287,8 @@
                 }
                 break;
             default:
-                LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
+                LOG.fatal(
+                        "Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
                 client.printUsage();
                 System.exit(-1);
         }
@@ -366,8 +365,8 @@
                 "Amount of memory in MB to be requested to run the application master"));
         opts.addOption(new Option("log_properties", true, "log4j.properties file"));
         opts.addOption(new Option("n", "name", true, "Asterix instance name (required)"));
-        opts.addOption(new Option("zip", "asterixZip", true,
-                "zip file with AsterixDB inside- if in non-default location"));
+        opts.addOption(
+                new Option("zip", "asterixZip", true, "zip file with AsterixDB inside- if in non-default location"));
         opts.addOption(new Option("bc", "baseConfig", true,
                 "base Asterix parameters configuration file if not in default position"));
         opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)"));
@@ -381,13 +380,13 @@
                 "Backup timestamp for arguments requiring a specific backup (rm, restore)"));
         opts.addOption(new Option("v", "debug", false, "Dump out debug information"));
         opts.addOption(new Option("help", false, "Print usage"));
-        opts.addOption(new Option("f", "force", false,
-                "Execute this command as fully as possible, disregarding any caution"));
+        opts.addOption(
+                new Option("f", "force", false, "Execute this command as fully as possible, disregarding any caution"));
         return opts;
     }
 
     /**
-   */
+    */
     public AsterixYARNClient() throws Exception {
         this(new YarnConfiguration());
     }
@@ -589,8 +588,9 @@
             try {
                 ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId);
                 YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState();
-                if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED)
-                        && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+                if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED
+                        || prevStatus == YarnApplicationState.FINISHED) && mode != Mode.DESTROY && mode != Mode.BACKUP
+                        && mode != Mode.RESTORE) {
                     throw new IllegalStateException("Instance is already running in: " + lockAppId);
                 } else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
                     //stale lock file
@@ -598,7 +598,8 @@
                     deleteLockFile();
                 }
             } catch (YarnException e) {
-                LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
+                LOG.warn(
+                        "Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
                 deleteLockFile();
             }
         }
@@ -690,6 +691,7 @@
 
     /**
      * Upload External libraries and functions to HDFS for an instance to use when started
+     * 
      * @throws IllegalStateException
      * @throws IOException
      */
@@ -700,8 +702,8 @@
             throw new IllegalStateException("No instance by name " + instanceName + " found.");
         }
         if (isRunning()) {
-            throw new IllegalStateException("Instance " + instanceName
-                    + " is running. Please stop it before installing any libraries.");
+            throw new IllegalStateException(
+                    "Instance " + instanceName + " is running. Please stop it before installing any libraries.");
         }
         String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse
                 + Path.SEPARATOR;
@@ -714,6 +716,7 @@
 
     /**
      * Finds the minimal classes and JARs needed to start the AM only.
+     * 
      * @return Resources the AM needs to start on the initial container.
      * @throws IllegalStateException
      * @throws IOException
@@ -771,7 +774,9 @@
 
     /**
      * Uploads a AsterixDB cluster configuration to HDFS for the AM to use.
-     * @param overwrite Overwrite existing configurations by the same name.
+     * 
+     * @param overwrite
+     *            Overwrite existing configurations by the same name.
      * @throws IllegalStateException
      * @throws IOException
      */
@@ -791,6 +796,7 @@
 
     /**
      * Uploads binary resources to HDFS for use by the AM
+     * 
      * @return
      * @throws IOException
      * @throws YarnException
@@ -954,14 +960,11 @@
         }
         if (mode == Mode.DESTROY) {
             vargs.add("-obliterate");
-        }
-        else if (mode == Mode.BACKUP) {
+        } else if (mode == Mode.BACKUP) {
             vargs.add("-backup");
-        }
-        else if (mode == Mode.RESTORE) {
+        } else if (mode == Mode.RESTORE) {
             vargs.add("-restore " + snapName);
-        }
-        else if( mode == Mode.INSTALL){
+        } else if (mode == Mode.INSTALL) {
             vargs.add("-initial ");
         }
         if (refresh) {
@@ -1036,8 +1039,11 @@
 
     /**
      * Asks YARN to kill a given application by appId
-     * @param appId The application to kill.
-     * @param yarnClient The YARN client object that is connected to the RM.
+     * 
+     * @param appId
+     *            The application to kill.
+     * @param yarnClient
+     *            The YARN client object that is connected to the RM.
      * @throws YarnException
      * @throws IOException
      */
@@ -1064,11 +1070,11 @@
 
     /**
      * Tries to stop a running AsterixDB instance gracefully.
+     * 
      * @throws IOException
      * @throws YarnException
      */
-    private void stopInstanceIfRunning()
-            throws IOException, YarnException {
+    private void stopInstanceIfRunning() throws IOException, YarnException {
         FileSystem fs = FileSystem.get(conf);
         String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
         Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
@@ -1086,20 +1092,24 @@
 
     /**
      * Start a YARN job to delete local AsterixDB resources of an extant instance
-     * @param app The Client connection
-     * @param resources AM resources
+     * 
+     * @param app
+     *            The Client connection
+     * @param resources
+     *            AM resources
      * @throws IOException
      * @throws YarnException
      */
 
-    private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
-            YarnException {
+    private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+            throws IOException, YarnException {
         FileSystem fs = FileSystem.get(conf);
         //if the instance is up, fix that
         stopInstanceIfRunning();
         //now try deleting all of the on-disk artifacts on the cluster
         ApplicationId deleter = deployAM(app, resources, Mode.DESTROY);
-        boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort);
+        boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start",
+                ccRestPort);
         if (!delete_start) {
             if (force) {
                 fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
@@ -1121,19 +1131,20 @@
 
     /**
      * Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS
+     * 
      * @param app
      * @param resources
      * @throws IOException
      * @throws YarnException
      */
 
-    private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
-            YarnException {
+    private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+            throws IOException, YarnException {
         stopInstanceIfRunning();
         ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP);
         boolean backupStart;
-        backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString()
-                + "to start", ccRestPort);
+        backupStart = Utils.waitForApplication(backerUpper, yarnClient,
+                "Waiting for backup " + backerUpper.toString() + "to start", ccRestPort);
         if (!backupStart) {
             LOG.fatal("Backup failed to start");
             throw new YarnException();
@@ -1149,17 +1160,19 @@
 
     /**
      * Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance
+     * 
      * @param app
      * @param resources
      * @throws IOException
      * @throws YarnException
      */
 
-    private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
-            YarnException {
+    private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources)
+            throws IOException, YarnException {
         stopInstanceIfRunning();
         ApplicationId restorer = deployAM(app, resources, Mode.RESTORE);
-        boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort);
+        boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start",
+                ccRestPort);
         if (!restoreStart) {
             LOG.fatal("Restore failed to start");
             throw new YarnException();
@@ -1194,7 +1207,7 @@
         }
         try {
             String ccIp = Utils.getCCHostname(instanceName, conf);
-            Utils.sendShutdownCall(ccIp,ccRestPort);
+            Utils.sendShutdownCall(ccIp, ccRestPort);
         } catch (IOException e) {
             LOG.error("Error while trying to issue safe shutdown:", e);
         }
@@ -1207,7 +1220,7 @@
                 AsterixYARNClient.killApplication(appId, yarnClient);
                 completed = true;
             } catch (YarnException e1) {
-                LOG.fatal("Could not stop nor kill instance gracefully.",e1);
+                LOG.fatal("Could not stop nor kill instance gracefully.", e1);
                 return;
             }
         }
@@ -1278,11 +1291,12 @@
 
     /**
      * Locate the Asterix parameters file.
-      * @return
+     * 
+     * @return
      * @throws FileNotFoundException
      * @throws IOException
      */
-    private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{
+    private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException {
         AsterixConfiguration configuration;
         String configPathBase = MERGED_PARAMETERS_PATH;
         if (baseConfig != null) {
@@ -1299,14 +1313,14 @@
     /**
      *
      */
-    private void readConfigParams(AsterixConfiguration configuration){
+    private void readConfigParams(AsterixConfiguration configuration) {
         //this is the "base" config that is inside the zip, we start here
         for (org.apache.asterix.common.configuration.Property property : configuration.getProperty()) {
             if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) {
                 ccJavaOpts = property.getValue();
             } else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) {
                 ncJavaOpts = property.getValue();
-            } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){
+            } else if (property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)) {
                 ccRestPort = Integer.parseInt(property.getValue());
             }
 
@@ -1315,6 +1329,7 @@
 
     /**
      * Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters
+     * 
      * @param cluster
      * @throws FileNotFoundException
      * @throws IOException
@@ -1332,14 +1347,21 @@
         configuration.setVersion(version);
 
         configuration.setInstanceName(asterixInstanceName);
-        String storeDir = null;
         List<Store> stores = new ArrayList<Store>();
+        String storeDir = cluster.getStore().trim();
         for (Node node : cluster.getNode()) {
-            storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-            stores.add(new Store(node.getId(), storeDir));
+            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+            String[] nodeIdDevice = iodevices.split(",");
+            StringBuilder nodeStores = new StringBuilder();
+            for (int i = 0; i < nodeIdDevice.length; i++) {
+                nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+            }
+            //remove last comma
+            nodeStores.deleteCharAt(nodeStores.length() - 1);
+            stores.add(new Store(node.getId(), nodeStores.toString()));
         }
         configuration.setStore(stores);
-
+        configuration.setStorageDirName(storeDir);
         List<Coredump> coredump = new ArrayList<Coredump>();
         String coredumpDir = null;
         List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
@@ -1354,7 +1376,6 @@
                     + File.separator));
         }
         configuration.setMetadataNode(metadataNodeId);
-
         configuration.setCoredump(coredump);
         configuration.setTransactionLogDir(txnLogDirs);
         FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/564
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8c7fbca5113dd7ad569a46dfa2591addb5bf8655
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hubailmor@gmail.com>

Mime
View raw message