asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [3/3] incubator-asterixdb git commit: Divide Cluster into Unique Partitions
Date Fri, 01 Jan 2016 01:17:04 GMT
Divide Cluster into Unique Partitions

The change includes the following:
- Fix passing NC stores to AsterixConfiguration.
- Unify storage direcotry name in the instance level rather than the node level.
- Divide the cluster into unique storage partitions based on the number of stores.
- Refactored FileSplits and moved out of AqlMetadataProvider.
- Make AsterixHyracksIntegrationUtil use the passed configuration file.
- Make File Splits pass relative index paths of partitions rather than absolute paths.
- Remove unused AqlCompiledMetadataDeclarations class.

Change-Id: I8c7fbca5113dd7ad569a46dfa2591addb5bf8655
Reviewed-on: https://asterix-gerrit.ics.uci.edu/564
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/1d5cf640
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/1d5cf640
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/1d5cf640

Branch: refs/heads/master
Commit: 1d5cf6403c5c8a2a0b5bf4c8f64b34ecbf1ccc30
Parents: e3e1373
Author: Murtadha Hubail <mhubail@uci.edu>
Authored: Thu Dec 31 08:38:15 2015 -0800
Committer: Murtadha Hubail <hubailmor@gmail.com>
Committed: Thu Dec 31 17:12:53 2015 -0800

----------------------------------------------------------------------
 .../api/common/AsterixAppRuntimeContext.java    |   6 +-
 .../common/AsterixHyracksIntegrationUtil.java   |  67 +++--
 .../asterix/aql/translator/QueryTranslator.java |   1 -
 .../bootstrap/NCApplicationEntryPoint.java      |  39 +--
 .../resources/asterix-build-configuration.xml   |   4 +-
 .../asterix/test/metadata/MetadataTest.java     |   5 -
 .../asterix/test/runtime/ExecutionTest.java     |   4 -
 .../test/runtime/SqlppExecutionTest.java        |   5 +-
 .../common/cluster/ClusterPartition.java        |  77 ++++++
 .../config/AsterixMetadataProperties.java       |  15 +-
 .../config/AsterixPropertiesAccessor.java       |  38 ++-
 .../common/context/DatasetLifecycleManager.java |  42 +--
 ...erixLSMInsertDeleteOperatorNodePushable.java |   2 +-
 .../src/main/resources/schema/cluster.xsd       |   1 -
 .../src/main/resources/schema/yarn_cluster.xsd  |   3 -
 .../apache/asterix/test/aql/TestExecutor.java   |  52 ++--
 .../asterix/event/driver/EventDriver.java       |   2 +-
 .../asterix/event/management/EventUtil.java     |   2 +-
 .../event/service/AsterixEventServiceUtil.java  |  85 +++---
 .../asterix/event/util/PatternCreator.java      |  44 ++-
 .../adapter/factory/HDFSAdapterFactory.java     | 111 ++++----
 .../factory/HDFSIndexingAdapterFactory.java     |   6 +-
 ...alDatasetIndexesAbortOperatorDescriptor.java |   5 +-
 ...lDatasetIndexesCommitOperatorDescriptor.java |  10 +-
 ...DatasetIndexesRecoverOperatorDescriptor.java |   4 +-
 .../installer/command/ValidateCommand.java      |  22 +-
 .../asterix/installer/driver/InstallerUtil.java |  13 +-
 .../metadata/bootstrap/MetadataBootstrap.java   |  34 ++-
 .../AqlCompiledMetadataDeclarations.java        | 276 -------------------
 .../declared/AqlLogicalPlanAndMetadataImpl.java |  16 +-
 .../metadata/declared/AqlMetadataProvider.java  | 164 +----------
 .../utils/SplitsAndConstraintsUtil.java         | 173 ++++++++++++
 .../om/util/AsterixClusterProperties.java       | 112 ++++++--
 ...dexModificationOperationCallbackFactory.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   4 +-
 ...dexModificationOperationCallbackFactory.java |   4 +-
 .../PersistentLocalResourceFactory.java         |   4 +-
 .../PersistentLocalResourceRepository.java      | 108 ++++----
 .../service/recovery/RecoveryManager.java       |  38 ++-
 .../asterix/aoya/AsterixApplicationMaster.java  |  25 +-
 .../apache/asterix/aoya/AsterixYARNClient.java  | 154 ++++++-----
 42 files changed, 844 insertions(+), 941 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 45c0598..4a8a323 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -187,7 +187,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
             String nodeId = ncApplicationContext.getNodeId();
 
             replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
-                    metadataProperties.getStores().get(nodeId)[0], nodeId, replicationProperties.getReplicationStore());
+                    AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
+                    replicationProperties.getReplicationStore());
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -377,7 +378,6 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
 
     @Override
     public void initializeResourceIdFactory() throws HyracksDataException {
-        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext)
-                .createResourceIdFactory();
+        resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 0145651..d7842e8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,11 +19,10 @@
 package org.apache.asterix.api.common;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.EnumSet;
+import java.util.Set;
 
 import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -41,22 +40,18 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 public class AsterixHyracksIntegrationUtil {
 
     private static final String IO_DIR_KEY = "java.io.tmpdir";
-    public static final int NODES = 2;
-    public static final int PARTITONS = 2;
-
     public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
-
     public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
 
     public static ClusterControllerService cc;
-    public static NodeControllerService[] ncs = new NodeControllerService[NODES];
+    public static NodeControllerService[] ncs;
     public static IHyracksClientConnection hcc;
 
-    protected static AsterixTransactionProperties txnProperties;
+    private static AsterixPropertiesAccessor propertiesAccessor;
 
     public static void init(boolean deleteOldInstanceData) throws Exception {
-        AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
-        txnProperties = new AsterixTransactionProperties(apa);
+        propertiesAccessor = new AsterixPropertiesAccessor();
+        ncs = new NodeControllerService[propertiesAccessor.getNodeNames().size()];
         if (deleteOldInstanceData) {
             deleteTransactionLogs();
             removeTestStorageFiles();
@@ -77,7 +72,8 @@ public class AsterixHyracksIntegrationUtil {
 
         // Starts ncs.
         int n = 0;
-        for (String ncName : getNcNames()) {
+        Set<String> nodes = propertiesAccessor.getNodeNames();
+        for (String ncName : nodes) {
             NCConfig ncConfig1 = new NCConfig();
             ncConfig1.ccHost = "localhost";
             ncConfig1.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
@@ -87,13 +83,28 @@ public class AsterixHyracksIntegrationUtil {
             ncConfig1.nodeId = ncName;
             ncConfig1.resultTTL = 30000;
             ncConfig1.resultSweepThreshold = 1000;
-            for (int p = 0; p < PARTITONS; ++p) {
+            String tempPath = System.getProperty(IO_DIR_KEY);
+            if (tempPath.endsWith(File.separator)) {
+                tempPath = tempPath.substring(0, tempPath.length() - 1);
+            }
+            //get initial partitions from properties
+            String[] nodeStores = propertiesAccessor.getStores().get(ncName);
+            if (nodeStores == null) {
+                throw new Exception("Coudn't find stores for NC: " + ncName);
+            }
+            String tempDirPath = System.getProperty(IO_DIR_KEY);
+            if (!tempDirPath.endsWith(File.separator)) {
+                tempDirPath += File.separator;
+            }
+            for (int p = 0; p < nodeStores.length; p++) {
+                //create IO devices based on stores
+                String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
+                File ioDeviceDir = new File(iodevicePath);
+                ioDeviceDir.mkdirs();
                 if (p == 0) {
-                    ncConfig1.ioDevices = System.getProperty("java.io.tmpdir") + File.separator + ncConfig1.nodeId
-                            + "/iodevice" + p;
+                    ncConfig1.ioDevices = iodevicePath;
                 } else {
-                    ncConfig1.ioDevices += "," + System.getProperty("java.io.tmpdir") + File.separator
-                            + ncConfig1.nodeId + "/iodevice" + p;
+                    ncConfig1.ioDevices += "," + iodevicePath;
                 }
             }
             ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
@@ -105,19 +116,7 @@ public class AsterixHyracksIntegrationUtil {
     }
 
     public static String[] getNcNames() {
-        String[] names = new String[NODES];
-        for (int n = 0; n < NODES; ++n) {
-            names[n] = "asterix_nc" + (n + 1);
-        }
-        return names;
-    }
-
-    public static String[] getDataDirs() {
-        String[] names = new String[NODES];
-        for (int n = 0; n < NODES; ++n) {
-            names[n] = "asterix_nc" + (n + 1) + "data";
-        }
-        return names;
+        return propertiesAccessor.getNodeNames().toArray(new String[propertiesAccessor.getNodeNames().size()]);
     }
 
     public static IHyracksClientConnection getHyracksClientConnection() {
@@ -147,17 +146,17 @@ public class AsterixHyracksIntegrationUtil {
         hcc.waitForCompletion(jobId);
     }
 
-    private static void removeTestStorageFiles() throws IOException {
+    public static void removeTestStorageFiles() {
         File dir = new File(System.getProperty(IO_DIR_KEY));
-        for (String ncName : AsterixHyracksIntegrationUtil.getNcNames()) {
+        for (String ncName : propertiesAccessor.getNodeNames()) {
             File ncDir = new File(dir, ncName);
             FileUtils.deleteQuietly(ncDir);
         }
     }
 
     private static void deleteTransactionLogs() throws Exception {
-        for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
-            File log = new File(txnProperties.getLogDirectory(ncId));
+        for (String ncId : propertiesAccessor.getNodeNames()) {
+            File log = new File(propertiesAccessor.getTransactionLogDirs().get(ncId));
             if (log.exists()) {
                 FileUtils.deleteDirectory(log);
             }
@@ -185,7 +184,7 @@ public class AsterixHyracksIntegrationUtil {
         try {
             System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
 
-            init(false);
+            init(true);
             while (true) {
                 Thread.sleep(10000);
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 714e05c..08b92e7 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -1246,7 +1246,6 @@ public class QueryTranslator extends AbstractLangTranslator {
                 }
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-
             //#. mark PendingDropOp on the dataverse record by
             //   first, deleting the dataverse record from the DATAVERSE_DATASET
             //   second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 147a356..496c2f8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -46,11 +46,11 @@ import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.AsterixFilesUtil;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -202,7 +202,6 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
-
         if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
@@ -213,7 +212,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
 
             PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
                     .getLocalResourceRepository();
-            localResourceRepository.initializeNewUniverse(metadataProperties.getStores().get(nodeId)[0]);
+            localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
         IAsterixStateProxy proxy = null;
@@ -277,22 +276,18 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         performLocalCleanUp();
     }
 
-    private void performLocalCleanUp() throws IOException {
+    private void performLocalCleanUp() {
         //delete working area files from failed jobs
         runtimeContext.getIOManager().deleteWorkspaceFiles();
 
         //reclaim storage for temporary datasets.
-        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
-                .getLocalResourceRepository();
-
-        String[] storageMountingPoints = localResourceRepository.getStorageMountingPoints();
-        String storageFolderName = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
-                .get(nodeId)[0];
-
-        for (String mountPoint : storageMountingPoints) {
-            String tempDatasetFolder = mountPoint + storageFolderName + File.separator
-                    + AqlMetadataProvider.TEMP_DATASETS_STORAGE_FOLDER;
-            AsterixFilesUtil.deleteFolder(tempDatasetFolder);
+        //get node stores
+        String[] nodeStores = ((IAsterixPropertiesProvider) runtimeContext).getMetadataProperties().getStores()
+                .get(nodeId);
+        for (String store : nodeStores) {
+            String tempDatasetFolder = store + File.separator
+                    + SplitsAndConstraintsUtil.TEMP_DATASETS_STORAGE_FOLDER;
+            FileUtils.deleteQuietly(new File(tempDatasetFolder));
         }
 
         // TODO
@@ -309,7 +304,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             if (cluster == null) {
                 throw new IllegalStateException("No cluster configuration found for this instance");
             }
-            String asterixInstanceName = cluster.getInstanceName();
+            String asterixInstanceName = metadataProperties.getInstanceName();
             AsterixTransactionProperties txnProperties = ((IAsterixPropertiesProvider) runtimeContext)
                     .getTransactionProperties();
             Node self = null;
@@ -322,8 +317,14 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             for (Node node : nodes) {
                 String ncId = asterixInstanceName + "_" + node.getId();
                 if (ncId.equalsIgnoreCase(nodeId)) {
-                    String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-                    metadataProperties.getStores().put(nodeId, storeDir.split(","));
+                    String storeDir = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+                    String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+                    String[] ioDevicePaths = nodeIoDevices.trim().split(",");
+                    for (int i = 0; i < ioDevicePaths.length; i++) {
+                        //construct full store path
+                        ioDevicePaths[i] += File.separator + storeDir;
+                    }
+                    metadataProperties.getStores().put(nodeId, ioDevicePaths);
 
                     String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
                     metadataProperties.getCoredumpPaths().put(nodeId, coredumpPath);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index fa20099..731113b 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,11 +20,11 @@
 	<metadataNode>asterix_nc1</metadataNode>
 	<store>
 		<ncId>asterix_nc1</ncId>
-		<storeDirs>asterix_nc1data</storeDirs>
+		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<store>
 		<ncId>asterix_nc2</ncId>
-		<storeDirs>asterix_nc2data</storeDirs>
+		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<transactionLogDir>
 		<ncId>asterix_nc1</ncId>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 376b2ff..6c6e411 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -65,11 +65,6 @@ public class MetadataTest {
         if (files == null || files.length == 0) {
             outdir.delete();
         }
-
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
     }
 
     @Parameters

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 052e0be..7a55c90 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -68,10 +68,6 @@ public class ExecutionTest {
     @AfterClass
     public static void tearDown() throws Exception {
         ExecutionTestUtil.tearDown();
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
     }
 
     @Parameters

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
index d5f4db3..22a3ad7 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionTest.java
@@ -65,10 +65,7 @@ public class SqlppExecutionTest {
     @AfterClass
     public static void tearDown() throws Exception {
         ExecutionTestUtil.tearDown();
-        // clean up the files written by the ASTERIX storage manager
-        for (String d : AsterixHyracksIntegrationUtil.getDataDirs()) {
-            testExecutor.deleteRec(new File(d));
-        }
+        AsterixHyracksIntegrationUtil.removeTestStorageFiles();
     }
 
     @Parameters

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
new file mode 100644
index 0000000..6cd44a7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ClusterPartition implements Cloneable {
+    private final int partitionId;
+    private final String nodeId;
+    private final int ioDeviceNum;
+    private String activeNodeId = null;
+    private boolean active = false;
+
+    public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
+        this.partitionId = partitionId;
+        this.nodeId = nodeId;
+        this.ioDeviceNum = ioDeviceNum;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public int getIODeviceNum() {
+        return ioDeviceNum;
+    }
+
+    public String getActiveNodeId() {
+        return activeNodeId;
+    }
+
+    public void setActiveNodeId(String activeNodeId) {
+        this.activeNodeId = activeNodeId;
+    }
+
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    @Override
+    public ClusterPartition clone() {
+        ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
+        return clone;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ID:" + partitionId);
+        sb.append(" Original Node: " + nodeId);
+        sb.append(" IODevice: " + ioDeviceNum);
+        sb.append(" Active Node: " + activeNodeId);
+        return sb.toString();
+    }
+
+    public boolean isActive() {
+        return active;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index adaca46..8e2c4e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -20,6 +20,9 @@ package org.apache.asterix.common.config;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.asterix.common.cluster.ClusterPartition;
 
 public class AsterixMetadataProperties extends AbstractAsterixProperties {
 
@@ -35,8 +38,8 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
         return accessor.getMetadataNodeName();
     }
 
-    public String getMetadataStore() {
-        return accessor.getMetadataStore();
+    public ClusterPartition getMetadataPartition() {
+        return accessor.getMetadataPartiton();
     }
 
     public Map<String, String[]> getStores() {
@@ -54,4 +57,12 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
     public Map<String, String> getCoredumpPaths() {
         return accessor.getCoredumpConfig();
     }
+
+    public Map<String, ClusterPartition[]> getNodePartitions() {
+        return accessor.getNodePartitions();
+    }
+
+    public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+        return accessor.getClusterPartitions();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index d6c81ab..cc7ec84 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -35,6 +37,7 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
 import org.apache.asterix.common.configuration.Property;
@@ -53,12 +56,15 @@ public class AsterixPropertiesAccessor {
     private final Map<String, Property> asterixConfigurationParams;
     private final Map<String, String> transactionLogDirs;
     private final Map<String, String> asterixBuildProperties;
+    private final Map<String, ClusterPartition[]> nodePartitionsMap;
+    private SortedMap<Integer, ClusterPartition> clusterPartitions;
 
     public AsterixPropertiesAccessor() throws AsterixException {
         String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
         if (fileName == null) {
             fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
         }
+
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(fileName);
         if (is == null) {
             try {
@@ -82,9 +88,20 @@ public class AsterixPropertiesAccessor {
         stores = new HashMap<String, String[]>();
         List<Store> configuredStores = asterixConfiguration.getStore();
         nodeNames = new HashSet<String>();
+        nodePartitionsMap = new HashMap<>();
+        clusterPartitions = new TreeMap<>();
+        int uniquePartitionId = 0;
         for (Store store : configuredStores) {
             String trimmedStoreDirs = store.getStoreDirs().trim();
-            stores.put(store.getNcId(), trimmedStoreDirs.split(","));
+            String[] nodeStores = trimmedStoreDirs.split(",");
+            ClusterPartition[] nodePartitions = new ClusterPartition[nodeStores.length];
+            for (int i = 0; i < nodePartitions.length; i++) {
+                ClusterPartition partition = new ClusterPartition(uniquePartitionId++, store.getNcId(), i);
+                clusterPartitions.put(partition.getPartitionId(), partition);
+                nodePartitions[i] = partition;
+            }
+            stores.put(store.getNcId(), nodeStores);
+            nodePartitionsMap.put(store.getNcId(), nodePartitions);
             nodeNames.add(store.getNcId());
         }
         asterixConfigurationParams = new HashMap<String, Property>();
@@ -116,10 +133,6 @@ public class AsterixPropertiesAccessor {
         return metadataNodeName;
     }
 
-    public String getMetadataStore() {
-        return stores.get(metadataNodeName)[0];
-    }
-
     public Map<String, String[]> getStores() {
         return stores;
     }
@@ -172,7 +185,7 @@ public class AsterixPropertiesAccessor {
         }
     }
 
-    private <T> void logConfigurationError(Property p, T defaultValue) {
+    private static <T> void logConfigurationError(Property p, T defaultValue) {
         if (LOGGER.isLoggable(Level.SEVERE)) {
             LOGGER.severe("Invalid property value '" + p.getValue() + "' for property '" + p.getName()
                     + "'.\n See the description: \n" + p.getDescription() + "\nDefault = " + defaultValue);
@@ -182,4 +195,17 @@ public class AsterixPropertiesAccessor {
     public String getInstanceName() {
         return instanceName;
     }
+
+    public ClusterPartition getMetadataPartiton() {
+        //metadata partition is always the first partition on the metadata node
+        return nodePartitionsMap.get(metadataNodeName)[0];
+    }
+
+    public Map<String, ClusterPartition[]> getNodePartitions() {
+        return nodePartitionsMap;
+    }
+
+    public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
+        return clusterPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index adf1152..5062d06 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -78,9 +78,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized IIndex getIndex(String resourceName) throws HyracksDataException {
-        int datasetID = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized IIndex getIndex(String resourcePath) throws HyracksDataException {
+        int datasetID = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
         return getIndex(datasetID, resourceID);
     }
 
@@ -98,9 +98,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void register(String resourceName, IIndex index) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
             dsInfo = getDatasetInfo(did);
@@ -116,16 +116,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
     }
 
-    public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
-        LocalResource lr = resourceRepository.getResourceByName(resourceName);
+    public int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
         if (lr == null) {
             return -1;
         }
         return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
     }
 
-    public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
-        LocalResource lr = resourceRepository.getResourceByName(resourceName);
+    public long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByPath(resourcePath);
         if (lr == null) {
             return -1;
         }
@@ -133,9 +133,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void unregister(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void unregister(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         IndexInfo iInfo = dsInfo.indexes.get(resourceID);
@@ -180,9 +180,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void open(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void open(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null || !dsInfo.isRegistered) {
@@ -262,9 +262,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void close(String resourceName) throws HyracksDataException {
-        int did = getDIDfromResourceName(resourceName);
-        long resourceID = getResourceIDfromResourceName(resourceName);
+    public synchronized void close(String resourcePath) throws HyracksDataException {
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
@@ -704,9 +704,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void allocateMemory(String resourceName) throws HyracksDataException {
+    public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
         //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
-        int did = Integer.parseInt(resourceName);
+        int did = Integer.parseInt(resourcePath);
         allocateDatasetMemory(did);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 51168ae..fd1ebb8 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -63,7 +63,7 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
         try {
             writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
+                    indexHelper.getResourcePath(), indexHelper.getResourceID(), lsmIndex, ctx);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/resources/schema/cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index d3203d5..872c959 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -113,7 +113,6 @@
 				<xs:element ref="cl:java_home" minOccurs="0" />
 				<xs:element ref="cl:log_dir" minOccurs="0" />
 				<xs:element ref="cl:txn_log_dir" minOccurs="0" />
-				<xs:element ref="cl:store" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:debug_port" minOccurs="0" />
 			</xs:sequence>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/main/resources/schema/yarn_cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/yarn_cluster.xsd b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
index f54cf90..8827985 100644
--- a/asterix-common/src/main/resources/schema/yarn_cluster.xsd
+++ b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
@@ -138,9 +138,6 @@
                     ref="cl:txn_log_dir"
                     minOccurs="0" />
                 <xs:element
-                    ref="cl:store"
-                    minOccurs="0" />
-                <xs:element
                     ref="cl:iodevices"
                     minOccurs="0" />
                 <xs:element

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index d04a3dd..d8147b6 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -71,7 +71,7 @@ public class TestExecutor {
         this.port = 19002;
     }
 
-    public TestExecutor(String host, int port){
+    public TestExecutor(String host, int port) {
         this.host = host;
         this.port = port;
     }
@@ -225,12 +225,16 @@ public class TestExecutor {
             // In future this may be changed depending on the requested
             // output format sent to the servlet.
             String errorBody = method.getResponseBodyAsString();
-            JSONObject result = new JSONObject(errorBody);
-            String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
-                    result.getString("stacktrace") };
-            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
-            throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
-                    + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            try {
+                JSONObject result = new JSONObject(errorBody);
+                String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+                        result.getString("stacktrace") };
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+                throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+                        + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+            } catch (Exception e) {
+                throw new Exception(errorBody);
+            }
         }
         return statusCode;
     }
@@ -303,7 +307,7 @@ public class TestExecutor {
     }
 
     private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
-        final String url = "http://"+host+":"+port+"/query/result";
+        final String url = "http://" + host + ":" + port + "/query/result";
 
         // Create a method instance.
         GetMethod method = new GetMethod(url);
@@ -430,9 +434,9 @@ public class TestExecutor {
                     switch (ctx.getType()) {
                         case "ddl":
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
                             } else {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl/sqlpp");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl/sqlpp");
                             }
                             break;
                         case "update":
@@ -442,9 +446,9 @@ public class TestExecutor {
                                         "127.0.0.1://../../../../../../asterix-app/");
                             }
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
                             } else {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update/sqlpp");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update/sqlpp");
                             }
                             break;
                         case "query":
@@ -461,25 +465,25 @@ public class TestExecutor {
                             OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
                             if (ctx.getFile().getName().endsWith("aql")) {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                    resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query",
-                                            cUnit.getParameter());
+                                    resultStream = executeQuery(statement, fmt,
+                                            "http://" + host + ":" + port + "/query", cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://"+host+":"+port+"/aql");
+                                            "http://" + host + ":" + port + "/aql");
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://"+host+":"+port+"/aql");
+                                            "http://" + host + ":" + port + "/aql");
                                 }
                             } else {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
-                                    resultStream = executeQuery(statement, fmt, "http://"+host+":"+port+"/query/sqlpp",
-                                            cUnit.getParameter());
+                                    resultStream = executeQuery(statement, fmt,
+                                            "http://" + host + ":" + port + "/query/sqlpp", cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://"+host+":"+port+"/sqlpp");
+                                            "http://" + host + ":" + port + "/sqlpp");
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://"+host+":"+port+"/sqlpp");
+                                            "http://" + host + ":" + port + "/sqlpp");
                                 }
                             }
 
@@ -506,7 +510,7 @@ public class TestExecutor {
                             break;
                         case "txnqbc": //qbc represents query before crash
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://"+host+":"+port+"/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
                             qbcFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qbc.adm");
@@ -515,7 +519,7 @@ public class TestExecutor {
                             break;
                         case "txnqar": //qar represents query after recovery
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://"+host+":"+port+"/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
                             qarFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qar.adm");
@@ -528,7 +532,7 @@ public class TestExecutor {
                             break;
                         case "txneu": //eu represents erroneous update
                             try {
-                                executeUpdate(statement, "http://"+host+":"+port+"/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
                             } catch (Exception e) {
                                 //An exception is expected.
                                 failed = true;
@@ -556,7 +560,7 @@ public class TestExecutor {
                             break;
                         case "errddl": // a ddlquery that expects error
                             try {
-                                executeDDL(statement, "http://"+host+":"+port+"/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
                             } catch (Exception e) {
                                 // expected error happens
                                 failed = true;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index c92262c..29765fd 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@ import org.kohsuke.args4j.CmdLineParser;
 public class EventDriver {
 
     public static final String CLIENT_NODE_ID = "client_node";
-    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
+    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
 
     private static String eventsDir;
     private static Events events;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index d6e7da0..b83faa2 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -191,7 +191,7 @@ public class EventUtil {
             String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
                     .getMasterNode().getJavaHome();
             return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
-                    null, null, null, cluster.getMasterNode().getDebugPort());
+                    null, null, cluster.getMasterNode().getDebugPort());
         }
 
         List<Node> nodeList = cluster.getNode();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 33ba787..4bd5098 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -45,7 +45,6 @@ import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
 import org.apache.asterix.common.configuration.Store;
@@ -59,6 +58,7 @@ import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Env;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.event.schema.cluster.Property;
+import org.apache.commons.io.IOUtils;
 
 public class AsterixEventServiceUtil {
 
@@ -90,8 +90,8 @@ public class AsterixEventServiceUtil {
         return instance;
     }
 
-    public static void createAsterixZip(AsterixInstance asterixInstance) throws IOException, InterruptedException,
-            JAXBException, EventException {
+    public static void createAsterixZip(AsterixInstance asterixInstance)
+            throws IOException, InterruptedException, JAXBException, EventException {
         String asterixInstanceDir = asterixInstanceDir(asterixInstance);
         unzip(AsterixEventService.getAsterixZip(), asterixInstanceDir);
 
@@ -128,18 +128,18 @@ public class AsterixEventServiceUtil {
 
         clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, ccJavaOpts));
         clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, ncJavaOpts));
-        clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
-                + "asterix"));
+        clusterProperties
+                .add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator + "asterix"));
         clusterProperties.add(new Property("LOG_DIR", cluster.getLogDir()));
         clusterProperties.add(new Property("JAVA_HOME", cluster.getJavaHome()));
         clusterProperties.add(new Property("WORKING_DIR", cluster.getWorkingDir().getDir()));
         clusterProperties.add(new Property("CLIENT_NET_IP", cluster.getMasterNode().getClientIp()));
         clusterProperties.add(new Property("CLUSTER_NET_IP", cluster.getMasterNode().getClusterIp()));
 
-        int clusterNetPort = cluster.getMasterNode().getClusterPort() != null ? cluster.getMasterNode()
-                .getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
-        int clientNetPort = cluster.getMasterNode().getClientPort() != null ? cluster.getMasterNode().getClientPort()
-                .intValue() : CLIENT_NET_PORT_DEFAULT;
+        int clusterNetPort = cluster.getMasterNode().getClusterPort() != null
+                ? cluster.getMasterNode().getClusterPort().intValue() : CLUSTER_NET_PORT_DEFAULT;
+        int clientNetPort = cluster.getMasterNode().getClientPort() != null
+                ? cluster.getMasterNode().getClientPort().intValue() : CLIENT_NET_PORT_DEFAULT;
         int httpPort = cluster.getMasterNode().getHttpPort() != null ? cluster.getMasterNode().getHttpPort().intValue()
                 : HTTP_PORT_DEFAULT;
 
@@ -151,8 +151,8 @@ public class AsterixEventServiceUtil {
     }
 
     private static String asterixZipName() {
-        return AsterixEventService.getAsterixZip().substring(
-                AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+        return AsterixEventService.getAsterixZip()
+                .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
     }
 
     private static String asterixJarPath(AsterixInstance asterixInstance, String asterixInstanceDir) {
@@ -174,8 +174,8 @@ public class AsterixEventServiceUtil {
         new File(asterixInstanceDir + File.separator + ASTERIX_CONFIGURATION_FILE).delete();
     }
 
-    private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir, AsterixInstance asterixInstance)
-            throws IOException, EventException, JAXBException {
+    private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir,
+            AsterixInstance asterixInstance) throws IOException, EventException, JAXBException {
         File sourceJar = new File(asterixJarPath(asterixInstance, asterixInstanceDir));
         writeAsterixClusterConfigurationFile(asterixInstance);
 
@@ -185,8 +185,8 @@ public class AsterixEventServiceUtil {
         new File(asterixInstanceDir + File.separator + CLUSTER_CONFIGURATION_FILE).delete();
     }
 
-    private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance) throws IOException,
-            EventException, JAXBException {
+    private static void writeAsterixClusterConfigurationFile(AsterixInstance asterixInstance)
+            throws IOException, EventException, JAXBException {
         String asterixInstanceName = asterixInstance.getName();
         Cluster cluster = asterixInstance.getCluster();
 
@@ -197,8 +197,8 @@ public class AsterixEventServiceUtil {
                 + asterixInstanceName + File.separator + "cluster.xml"));
     }
 
-    public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName,
-            String libraryName, String libraryPath) throws IOException {
+    public static void addLibraryToAsterixZip(AsterixInstance asterixInstance, String dataverseName, String libraryName,
+            String libraryPath) throws IOException {
         File instanceDir = new File(asterixInstanceDir(asterixInstance));
         if (!instanceDir.exists()) {
             instanceDir.mkdirs();
@@ -235,30 +235,8 @@ public class AsterixEventServiceUtil {
         return metadataNode;
     }
 
-    public static String getNodeDirectories(String asterixInstanceName, Node node, Cluster cluster) {
-        String storeDataSubDir = asterixInstanceName + File.separator + "data" + File.separator;
-        String[] storeDirs = null;
-        StringBuffer nodeDataStore = new StringBuffer();
-        String storeDirValue = node.getStore();
-        if (storeDirValue == null) {
-            storeDirValue = cluster.getStore();
-            if (storeDirValue == null) {
-                throw new IllegalStateException(" Store not defined for node " + node.getId());
-            }
-            storeDataSubDir = node.getId() + File.separator + storeDataSubDir;
-        }
-
-        storeDirs = storeDirValue.split(",");
-        for (String ns : storeDirs) {
-            nodeDataStore.append(ns + File.separator + storeDataSubDir.trim());
-            nodeDataStore.append(",");
-        }
-        nodeDataStore.deleteCharAt(nodeDataStore.length() - 1);
-        return nodeDataStore.toString();
-    }
-
-    private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance) throws IOException,
-            JAXBException {
+    private static void writeAsterixConfigurationFile(AsterixInstance asterixInstance)
+            throws IOException, JAXBException {
         String asterixInstanceName = asterixInstance.getName();
         Cluster cluster = asterixInstance.getCluster();
         String metadataNodeId = asterixInstance.getMetadataNodeId();
@@ -266,29 +244,34 @@ public class AsterixEventServiceUtil {
         AsterixConfiguration configuration = asterixInstance.getAsterixConfiguration();
         configuration.setInstanceName(asterixInstanceName);
         configuration.setMetadataNode(asterixInstanceName + "_" + metadataNodeId);
-        String storeDir = null;
         List<Store> stores = new ArrayList<Store>();
+        String storeDir = cluster.getStore().trim();
         for (Node node : cluster.getNode()) {
-            storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-            stores.add(new Store(asterixInstanceName + "_" + node.getId(), storeDir));
+            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
+            String[] nodeIdDevice = iodevices.split(",");
+            StringBuilder nodeStores = new StringBuilder();
+            for (int i = 0; i < nodeIdDevice.length; i++) {
+                nodeStores.append(nodeIdDevice[i] + File.separator + storeDir + ",");
+            }
+            //remove last comma
+            nodeStores.deleteCharAt(nodeStores.length() - 1);
+            stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeStores.toString()));
         }
         configuration.setStore(stores);
-
         List<Coredump> coredump = new ArrayList<Coredump>();
         String coredumpDir = null;
         List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
         String txnLogDir = null;
         for (Node node : cluster.getNode()) {
             coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
-            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir + File.separator
-                    + asterixInstanceName + "_" + node.getId()));
+            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(),
+                    coredumpDir + File.separator + asterixInstanceName + "_" + node.getId()));
 
             txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
             txnLogDirs.add(new TransactionLogDir(asterixInstanceName + "_" + node.getId(), txnLogDir));
         }
         configuration.setCoredump(coredump);
         configuration.setTransactionLogDir(txnLogDirs);
-
         File asterixConfDir = new File(AsterixEventService.getAsterixDir() + File.separator + asterixInstanceName);
         asterixConfDir.mkdirs();
 
@@ -300,8 +283,6 @@ public class AsterixEventServiceUtil {
         os.close();
     }
 
-
-
     public static void unzip(String sourceFile, String destDir) throws IOException {
         BufferedOutputStream dest = null;
         FileInputStream fis = new FileInputStream(sourceFile);
@@ -432,8 +413,8 @@ public class AsterixEventServiceUtil {
             }
         }
         if (!valid) {
-            throw new EventException("Asterix instance by the name " + name + " is in " + instance.getState()
-                    + " state ");
+            throw new EventException(
+                    "Asterix instance by the name " + name + " is in " + instance.getState() + " state ");
         }
         return instance;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b6aaddb..6085019 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -164,11 +164,11 @@ public class PatternCreator {
         String store;
         String pargs;
         String iodevices;
+        store = cluster.getStore();
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
-            store = node.getStore() == null ? cluster.getStore() : node.getStore();
             pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
@@ -188,12 +188,12 @@ public class PatternCreator {
         String txnLogDir;
         String store;
         String pargs;
+        store = cluster.getStore();
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
             txnLogDir = node.getTxnLogDir() == null ? instance.getCluster().getTxnLogDir() : node.getTxnLogDir();
-            store = node.getStore() == null ? cluster.getStore() : node.getStore();
             pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + txnLogDir + " " + backupId + " " + backupDir
                     + " " + "local" + " " + node.getId();
@@ -212,14 +212,12 @@ public class PatternCreator {
         VerificationUtil.verifyBackupRestoreConfiguration(hdfsUrl, hadoopVersion, hdfsBackupDir);
         String workingDir = cluster.getWorkingDir().getDir();
         int backupId = backupInfo.getId();
-        String nodeStore;
         String pargs;
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeStore = node.getStore() == null ? clusterStore : node.getStore();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
                     + hadoopVersion;
@@ -235,14 +233,12 @@ public class PatternCreator {
         String backupDir = backupInfo.getBackupConf().getBackupDir();
         String workingDir = cluster.getWorkingDir().getDir();
         int backupId = backupInfo.getId();
-        String nodeStore;
         String pargs;
         List<Pattern> patternList = new ArrayList<Pattern>();
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeStore = node.getStore() == null ? clusterStore : node.getStore();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + nodeStore + " "
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
                     + AsterixConstants.ASTERIX_ROOT_METADATA_DIR + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
                     + backupId + " " + backupDir + " " + "local" + " " + node.getId();
             Event event = new Event("restore", nodeid, pargs);
@@ -262,8 +258,8 @@ public class PatternCreator {
 
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
-        String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp()
-                + " " + workingDir;
+        String pargs = username + " " + hadoopDir.getAbsolutePath() + " " + cluster.getMasterNode().getClusterIp() + " "
+                + workingDir;
         Event event = new Event("directory_transfer", nodeid, pargs);
         Pattern p = new Pattern(null, 1, null, event);
         addInitialDelay(p, 2, "sec");
@@ -428,8 +424,8 @@ public class PatternCreator {
                 patternList.add(p);
             }
 
-            pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir
-                    + " " + "unpack";
+            pargs = username + " " + fileToTransfer + " " + cluster.getMasterNode().getClusterIp() + " " + destDir + " "
+                    + "unpack";
             event = new Event("file_transfer", nodeid, pargs);
             p = new Pattern(null, 1, null, event);
             patternList.add(p);
@@ -529,8 +525,8 @@ public class PatternCreator {
             String[] nodeIODevices;
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
             nodeIODevices = iodevices.trim().split(",");
+            String nodeStore = cluster.getStore().trim();
             for (String nodeIODevice : nodeIODevices) {
-                String nodeStore = node.getStore() == null ? cluster.getStore() : node.getStore();
                 pargs = nodeIODevice.trim() + File.separator + nodeStore;
                 Event event = new Event("file_delete", nodeid, pargs);
                 patternList.add(new Pattern(null, 1, null, event));
@@ -540,13 +536,15 @@ public class PatternCreator {
         return patterns;
     }
 
-    private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp, String destDir) {
+    private Pattern createCopyHyracksPattern(String instanceName, Cluster cluster, String destinationIp,
+            String destDir) {
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
         String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
-        String asterixZipName = AsterixEventService.getAsterixZip().substring(
-                AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
-        String fileToTransfer = new File(AsterixEventService.getAsterixDir() + File.separator + instanceName
-                + File.separator + asterixZipName).getAbsolutePath();
+        String asterixZipName = AsterixEventService.getAsterixZip()
+                .substring(AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
+        String fileToTransfer = new File(
+                AsterixEventService.getAsterixDir() + File.separator + instanceName + File.separator + asterixZipName)
+                        .getAbsolutePath();
         String pargs = username + " " + fileToTransfer + " " + destinationIp + " " + destDir + " " + "unpack";
         Event event = new Event("file_transfer", nodeid, pargs);
         return new Pattern(null, 1, null, event);
@@ -607,8 +605,8 @@ public class PatternCreator {
             ps.add(p);
 
             nodeid = new Nodeid(new Value(null, nodeToBeAdded.getId()));
-            pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp()
-                    + " " + workingDir;
+            pargs = cluster.getUsername() + " " + hadoopDir.getAbsolutePath() + " " + nodeToBeAdded.getClusterIp() + " "
+                    + workingDir;
             event = new Event("directory_transfer", nodeid, pargs);
             p = new Pattern(null, 1, null, event);
             addInitialDelay(p, 2, "sec");
@@ -626,8 +624,8 @@ public class PatternCreator {
         String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
         String srcHost = cluster.getMasterNode().getClientIp();
         Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
-        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
-                .getLogDir();
+        String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
+                : cluster.getMasterNode().getLogDir();
         String destDir = outputDir + File.separator + "cc";
         String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
         Event event = new Event("directory_copy", nodeid, pargs);
@@ -649,7 +647,7 @@ public class PatternCreator {
         Patterns patterns = new Patterns(patternList);
         return patterns;
     }
-    
+
     private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
 
         List<Pattern> patternList = new ArrayList<Pattern>();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
index ebf41cc..c4a96f4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -34,7 +34,6 @@ import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactor
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
 import org.apache.hadoop.fs.BlockLocation;
@@ -211,12 +210,10 @@ public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAd
         Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         for (String i : stores.keySet()) {
             String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
             for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                    locs.add(i);
-                }
+                //two readers per partition
+                locs.add(i);
+                locs.add(i);
             }
         }
         String[] cluster = new String[locs.size()];
@@ -273,67 +270,67 @@ public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAd
      * @throws IOException
      */
     protected InputSplit[] getSplits(JobConf conf) throws IOException {
-        // Create file system object
-        FileSystem fs = FileSystem.get(conf);
         ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
         ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<ExternalFile>();
-        // Create files splits
-        for (ExternalFile file : files) {
-            Path filePath = new Path(file.getFileName());
-            FileStatus fileStatus;
-            try {
-                fileStatus = fs.getFileStatus(filePath);
-            } catch (FileNotFoundException e) {
-                // file was deleted at some point, skip to next file
-                continue;
-            }
-            if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
-                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                // Get its information from HDFS name node
-                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
-                // Create a split per block
-                for (BlockLocation block : fileBlocks) {
-                    if (block.getOffset() < file.getSize()) {
-                        fileSplits
-                                .add(new FileSplit(filePath,
-                                        block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
-                                                ? block.getLength() : (file.getSize() - block.getOffset()),
-                                block.getHosts()));
-                        orderedExternalFiles.add(file);
-                    }
-                }
-            } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
-                    && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
-                long oldSize = 0L;
-                long newSize = file.getSize();
-                for (int i = 0; i < files.size(); i++) {
-                    if (files.get(i).getFileName() == file.getFileName() && files.get(i).getSize() != file.getSize()) {
-                        newSize = files.get(i).getSize();
-                        oldSize = file.getSize();
-                        break;
-                    }
+        // Create file system object
+        try (FileSystem fs = FileSystem.get(conf)) {
+            // Create files splits
+            for (ExternalFile file : files) {
+                Path filePath = new Path(file.getFileName());
+                FileStatus fileStatus;
+                try {
+                    fileStatus = fs.getFileStatus(filePath);
+                } catch (FileNotFoundException e) {
+                    // file was deleted at some point, skip to next file
+                    continue;
                 }
-
-                // Get its information from HDFS name node
-                BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
-                // Create a split per block
-                for (BlockLocation block : fileBlocks) {
-                    if (block.getOffset() + block.getLength() > oldSize) {
-                        if (block.getOffset() < newSize) {
-                            // Block interact with delta -> Create a split
-                            long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
-                            long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
-                                    : block.getOffset() + block.getLength() - newSize;
-                            long splitLength = block.getLength() - startCut - endCut;
-                            fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+                if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP
+                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                    // Get its information from HDFS name node
+                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, file.getSize());
+                    // Create a split per block
+                    for (BlockLocation block : fileBlocks) {
+                        if (block.getOffset() < file.getSize()) {
+                            fileSplits.add(new FileSplit(filePath,
+                                    block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+                                            ? block.getLength() : (file.getSize() - block.getOffset()),
                                     block.getHosts()));
                             orderedExternalFiles.add(file);
                         }
                     }
+                } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_NO_OP
+                        && fileStatus.getModificationTime() == file.getLastModefiedTime().getTime()) {
+                    long oldSize = 0L;
+                    long newSize = file.getSize();
+                    for (int i = 0; i < files.size(); i++) {
+                        if (files.get(i).getFileName() == file.getFileName()
+                                && files.get(i).getSize() != file.getSize()) {
+                            newSize = files.get(i).getSize();
+                            oldSize = file.getSize();
+                            break;
+                        }
+                    }
+
+                    // Get its information from HDFS name node
+                    BlockLocation[] fileBlocks = fs.getFileBlockLocations(fileStatus, 0, newSize);
+                    // Create a split per block
+                    for (BlockLocation block : fileBlocks) {
+                        if (block.getOffset() + block.getLength() > oldSize) {
+                            if (block.getOffset() < newSize) {
+                                // Block interact with delta -> Create a split
+                                long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
+                                long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+                                        : block.getOffset() + block.getLength() - newSize;
+                                long splitLength = block.getLength() - startCut - endCut;
+                                fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+                                        block.getHosts()));
+                                orderedExternalFiles.add(file);
+                            }
+                        }
+                    }
                 }
             }
         }
-        fs.close();
         files = orderedExternalFiles;
         return fileSplits.toArray(new FileSplit[fileSplits.size()]);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 11e2b96..8bf6d93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -32,7 +32,6 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
 import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
@@ -186,11 +185,8 @@ public class HDFSIndexingAdapterFactory extends HDFSAdapterFactory {
         Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         for (String i : stores.keySet()) {
             String[] nodeStores = stores.get(i);
-            int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
             for (int j = 0; j < nodeStores.length; j++) {
-                for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
-                }
+                locs.add(i);
             }
         }
         String[] cluster = new String[locs.size()];

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/1d5cf640/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
index bce4620..6ff991b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.indexing.operators;
 
-import java.io.File;
 import java.util.List;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -49,9 +48,7 @@ public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExter
     @Override
     protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
             IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
-        FileReference file = new FileReference(new File(IndexFileNameUtil.prepareFileName(fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(), fileIndexInfo
-                .getFileSplitProvider().getFileSplits()[partition].getIODeviceId())));
+        FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
         AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
         fileManager.deleteTransactionFiles();
     }


Mime
View raw message