asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [NO ISSUE][STO] Adapt Structure Structure To Rebalance
Date Sat, 25 Nov 2017 02:06:12 GMT
Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2181

Change subject: [NO ISSUE][STO] Adapt Structure Structure To Rebalance
......................................................................

[NO ISSUE][STO] Adapt Structure Structure To Rebalance

- user model changes: no
- storage format changes: no
- interface changes: yes
    -- Added IResource#setPath to use for the resource
       storage migration.

Details:
- Unify storage structure to support dataset rebalance:
  Old format:
  ./storage/partition_#/dataverse/datasetName_idx_indexName
  New format:
  ./storage/partition_#/dataverse/datasetName/rebalanaceNum/indexName
- Adapt recovery and replication to new storage structure.
- Add old structure -> new structure NC migration task.
- Add CompatibilityUtil to ensure NC can be upgraded during
  NC startup.
- Centralize the logic for parsing file path to its components in
  ResourceReference/DatasetResourceReference.
- Add storage structure migration test case.
- Add test case for recovery after rebalance.

Change-Id: I0f968b9f493bf5aa2d49f503afe21f0d438bb7f0
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
D asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
M asterixdb/asterix-replication/pom.xml
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java
M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java
28 files changed, 882 insertions(+), 422 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/81/2181/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 19966fe..e29e3fe 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -443,7 +443,7 @@
         return minFirstLSN;
     }
 
-    private long getRemoteMinFirstLSN() {
+    private long getRemoteMinFirstLSN() throws HyracksDataException {
         IReplicaResourcesManager remoteResourcesManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
         return remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index f922832..47f9315 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -81,11 +81,6 @@
         checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled);
         final Checkpoint latestCheckpoint = checkpointManager.getLatest();
         if (latestCheckpoint != null) {
-            if (latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
-                throw new IllegalStateException(
-                        String.format("Storage version mismatch. Current version (%s). On disk version: (%s)",
-                                StorageConstants.VERSION, latestCheckpoint.getStorageVersion()));
-            }
             transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId());
         }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java
new file mode 100644
index 0000000..7bfb35c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.LocalResource;
+
+/**
+ * Migrates a legacy storage structure to the current one
+ */
+public class MigrateStorageResourcesTask implements INCLifecycleTask {
+
+    private static final Logger LOGGER = Logger.getLogger(MigrateStorageResourcesTask.class.getName());
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void perform(IControllerService cs) throws HyracksDataException {
+        INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext();
+        ICheckpointManager checkpointMgr = appCtx.getTransactionSubsystem().getCheckpointManager();
+        final Checkpoint latestCheckpoint = checkpointMgr.getLatest();
+        if (latestCheckpoint == null) {
+            // nothing to migrate
+            return;
+        }
+        final IIOManager ioManager = appCtx.getIoManager();
+        final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
+        for (IODeviceHandle ioDeviceHandle : ioDevices) {
+            final Path root = Paths.get(ioDeviceHandle.getMount().getAbsolutePath());
+            if (!root.toFile().exists()) {
+                continue;
+            }
+            // all legacy resources are expected to be 5 levels below the storage root
+            try (Stream<Path> stream = Files.find(root, 5, (path, attr) -> path.getFileName().toString()
+                    .equals(StorageConstants.METADATA_FILE_NAME))) {
+                final List<Path> resourceToMigrate = stream.map(Path::getParent).collect(Collectors.toList());
+                for (Path src : resourceToMigrate) {
+                    final Path dest =
+                            migrateResourceMetadata(root.relativize(src), appCtx, latestCheckpoint.getStorageVersion());
+                    copyResourceFiles(root.resolve(src), root.resolve(dest),
+                            PersistentLocalResourceRepository.INDEX_COMPONENTS);
+                    FileUtils.deleteDirectory(root.resolve(src).toFile());
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    /**
+     * Migrates the resource metadata file at {@code resourcePath} to the new storage structure
+     * and updates the migrated version's metadata to reflect the new path.
+     *
+     * @param resourcePath
+     * @param appCtx
+     * @param resourceVersion
+     * @return The migrated resource relative path
+     * @throws HyracksDataException
+     */
+    private Path migrateResourceMetadata(Path resourcePath, INcApplicationContext appCtx, int resourceVersion)
+            throws HyracksDataException {
+        final ILocalResourceRepository localResourceRepository = appCtx.getLocalResourceRepository();
+        final LocalResource srcResource = localResourceRepository.get(resourcePath.toFile().getPath());
+        final DatasetLocalResource lsmResource = (DatasetLocalResource) srcResource.getResource();
+        // recreate the resource with the new path and version
+        final DatasetResourceReference lrr = DatasetResourceReference.of(srcResource, resourceVersion);
+        final Path destPath = lrr.getRelativePath();
+        final FileReference destDir = appCtx.getIoManager().resolve(destPath.toString());
+        // ensure the new dest dir is empty
+        if (destDir.getFile().exists()) {
+            FileUtils.deleteQuietly(destDir.getFile());
+        }
+        lsmResource.setPath(destPath.toString());
+
+        final LocalResource destResource =
+                new LocalResource(srcResource.getId(), srcResource.getVersion(), srcResource.isDurable(), lsmResource);
+        LOGGER.info(() -> "Migrating resource from: " + srcResource.getPath() + " to " + destResource.getPath());
+        localResourceRepository.insert(destResource);
+        return destPath;
+    }
+
+    /**
+     * Copies the files matching {@code filter} at {@code src} path to {@code dest}
+     *
+     * @param src
+     * @param dest
+     * @param filter
+     * @throws IOException
+     */
+    private void copyResourceFiles(Path src, Path dest, Predicate<Path> filter) throws IOException {
+        try (Stream<Path> stream = Files.list(src)) {
+            final List<Path> srcFiles = stream.filter(filter).collect(Collectors.toList());
+            for (Path srcFile : srcFiles) {
+                Path fileDest = Paths.get(dest.toString(), srcFile.getFileName().toString());
+                Files.copy(srcFile, fileDest);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 63f5bfc..47e5ac9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.common.config.NodeProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.PrintUtil;
@@ -46,6 +47,7 @@
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.utils.CompatibilityUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
@@ -54,7 +56,6 @@
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.messages.IMessageBroker;
-import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.BaseNCApplication;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -115,7 +116,10 @@
         MessagingChannelInterfaceFactory interfaceFactory =
                 new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
         this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
-
+        final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest();
+        if (latestCheckpoint != null) {
+            CompatibilityUtil.ensureCompatibility(controllerService, latestCheckpoint.getStorageVersion());
+        }
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
         final SystemState stateOnStartup = recoveryMgr.getSystemState();
         if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java
new file mode 100644
index 0000000..5d44fc9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.app.nc.task.MigrateStorageResourcesTask;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CompatibilityUtil {
+
+    private static final Logger LOGGER = Logger.getLogger(CompatibilityUtil.class.getName());
+    private static final int MIN_COMPATIBLE_VERSION = 1;
+
+    private CompatibilityUtil() {
+    }
+
+    public static void ensureCompatibility(NodeControllerService ncs, int onDiskVerson) throws HyracksDataException {
+        if (onDiskVerson == StorageConstants.VERSION) {
+            return;
+        }
+        ensureUpgradability(onDiskVerson);
+        LOGGER.info(() -> "Upgrading from storage version " + onDiskVerson + " to " + StorageConstants.VERSION);
+        final List<INCLifecycleTask> upgradeTasks = getUpgradeTasks(onDiskVerson);
+        for (INCLifecycleTask task : upgradeTasks) {
+            task.perform(ncs);
+        }
+    }
+
+    private static void ensureUpgradability(int onDiskVerson) {
+        if (onDiskVerson < MIN_COMPATIBLE_VERSION) {
+            throw new IllegalStateException(String.format(
+                    "Storage cannot be upgraded to new version. Current version (%s). On disk version: (%s)",
+                    StorageConstants.VERSION, onDiskVerson));
+        }
+    }
+
+    private static List<INCLifecycleTask> getUpgradeTasks(int fromVersion) {
+        List<INCLifecycleTask> upgradeTasks = new ArrayList<>();
+        if (fromVersion < StorageConstants.REBALANCE_STORAGE_VERSION) {
+            upgradeTasks.add(new MigrateStorageResourcesTask());
+        }
+        return upgradeTasks;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
new file mode 100644
index 0000000..e24ef2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.utils.RebalanceUtil;
+import org.junit.Assert;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class TestDataUtil {
+
+    private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
+    private static final TestCaseContext.OutputFormat OUTPUT_FORMAT = TestCaseContext.OutputFormat.CLEAN_JSON;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private TestDataUtil() {
+    }
+
+    /**
+     * Creates dataset with a single field called id as its primary key.
+     *
+     * @param dataset
+     * @throws Exception
+     */
+    public static void createIdOnlyDataset(String dataset) throws Exception {
+        TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE TYPE KeyType IF NOT EXISTS AS { id: int };", OUTPUT_FORMAT);
+        TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + "(KeyType) PRIMARY KEY id;", OUTPUT_FORMAT);
+    }
+
+    /**
+     * Upserts {@code count} ids into {@code dataset}
+     *
+     * @param dataset
+     * @param count
+     * @throws Exception
+     */
+    public static void upsertData(String dataset, long count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            TEST_EXECUTOR.executeSqlppUpdateOrDdl("UPSERT INTO " + dataset + " ({\"id\": " + i + "});",
+                    TestCaseContext.OutputFormat.CLEAN_JSON);
+        }
+    }
+
+    /**
+     * Gets the number of records in dataset {@code dataset}
+     *
+     * @param datasetName
+     * @return The count
+     * @throws Exception
+     */
+    public static long getDatasetCount(String datasetName) throws Exception {
+        final String query = "SELECT VALUE COUNT(*) FROM `" + datasetName + "`;";
+        final InputStream responseStream = TEST_EXECUTOR
+                .executeQueryService(query, TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), OUTPUT_FORMAT);
+        final ObjectNode response = OBJECT_MAPPER.readValue(responseStream, ObjectNode.class);
+        final JsonNode result = response.get("results");
+        // make sure there is a single value in result
+        Assert.assertEquals(1, result.size());
+        return result.get(0).asInt();
+    }
+
+    /**
+     * Rebalances a dataset to {@code targetNodes}
+     *
+     * @param integrationUtil
+     * @param dataverseName
+     * @param datasetName
+     * @param targetNodes
+     * @throws Exception
+     */
+    public static void rebalanceDataset(AsterixHyracksIntegrationUtil integrationUtil, String dataverseName,
+            String datasetName, String[] targetNodes) throws Exception {
+        ICcApplicationContext ccAppCtx =
+                (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+        MetadataProvider metadataProvider = new MetadataProvider(ccAppCtx, null);
+        try {
+            ActiveNotificationHandler activeNotificationHandler =
+                    (ActiveNotificationHandler) ccAppCtx.getActiveNotificationHandler();
+            activeNotificationHandler.suspend(metadataProvider);
+            try {
+                IMetadataLockManager lockManager = ccAppCtx.getMetadataLockManager();
+                lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+                        dataverseName + '.' + datasetName);
+                RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
+                        metadataProvider, ccAppCtx.getHcc(), NoOpDatasetRebalanceCallback.INSTANCE);
+            } finally {
+                activeNotificationHandler.resume(metadataProvider);
+            }
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
new file mode 100644
index 0000000..7b86c56
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.function.Function;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.storage.IndexPathElements;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MigrateStorageResourcesTaskTest {
+
+    private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        Logger logger = Logger.getLogger("org.apache");
+        logger.setLevel(Level.INFO);
+        ConsoleHandler handler = new ConsoleHandler();
+        handler.setLevel(Level.INFO);
+        logger.addHandler(handler);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, DEFAULT_TEST_CONFIG_FILE_NAME);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void storageStructureMigration() throws Exception {
+        Function<IndexPathElements, String> legacyIndexPathProvider = (pathElements) ->
+                (pathElements.getRebalanceCount().equals("0") ? "" : pathElements.getRebalanceCount() + File.separator)
+                        + pathElements.getDatasetName() + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + pathElements
+                        .getIndexName();
+        StoragePathUtil.indexPathProvider = legacyIndexPathProvider;
+        integrationUtil.init(true);
+        // create dataset and insert data using legacy structure
+        String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        TestDataUtil.upsertData(datasetName, 100);
+        final long countBeforeMigration = TestDataUtil.getDatasetCount(datasetName);
+        // stop NCs
+        integrationUtil.deinit(false);
+        // forge a checkpoint with old version to force migration to new storage structure on all ncs
+        final INcApplicationContext nc1AppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+        final AbstractCheckpointManager nc1CheckpointManager =
+                (AbstractCheckpointManager) nc1AppCtx.getTransactionSubsystem().getCheckpointManager();
+        forgeOldVersionCheckpoint(nc1CheckpointManager);
+        final INcApplicationContext nc2AppCtx = (INcApplicationContext) integrationUtil.ncs[1].getApplicationContext();
+        final AbstractCheckpointManager nc2CheckpointManager =
+                (AbstractCheckpointManager) nc2AppCtx.getTransactionSubsystem().getCheckpointManager();
+        forgeOldVersionCheckpoint(nc2CheckpointManager);
+
+        // remove the legacy path provider to use the new default structure
+        StoragePathUtil.indexPathProvider = null;
+        // start the NCs to do the migration
+        integrationUtil.init(false);
+        final long countAfterMigration = TestDataUtil.getDatasetCount(datasetName);
+        // ensure data migrated to new structure without issues
+        Assert.assertEquals(countBeforeMigration, countAfterMigration);
+    }
+
+    private void forgeOldVersionCheckpoint(AbstractCheckpointManager manger) throws HyracksDataException {
+        Checkpoint cp = new Checkpoint(-1, -1, 0, System.currentTimeMillis(), true,
+                StorageConstants.REBALANCE_STORAGE_VERSION - 1);
+        Path path = manger.getCheckpointPath(cp.getTimeStamp());
+        // Write checkpoint file to disk
+        try (BufferedWriter writer = Files.newBufferedWriter(path)) {
+            writer.write(cp.asJson());
+            writer.flush();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
index 723786c..29efa47 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -19,38 +19,30 @@
 package org.apache.asterix.test.txn;
 
 import java.io.File;
-import java.io.InputStream;
-import java.util.Random;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Property;
-import org.apache.asterix.common.utils.Servlets;
-import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
 import org.apache.asterix.test.common.TestHelper;
-import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 public class RecoveryManagerTest {
 
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml";
     private static final String TEST_CONFIG_PATH =
             System.getProperty("user.dir") + File.separator + "target" + File.separator + "config";
     private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
-    private static final TestExecutor testExecutor = new TestExecutor();
     private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
-    private static final Random random = new Random();
-    private static final int numRecords = 1;
 
     @Before
     public void setUp() throws Exception {
@@ -74,52 +66,53 @@
     @Test
     public void multiDatasetRecovery() throws Exception {
         String datasetNamePrefix = "ds_";
-        final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON;
-        testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int };", format);
         int numDatasets = 50;
         String datasetName = null;
         for (int i = 1; i <= numDatasets; i++) {
             datasetName = datasetNamePrefix + i;
-            testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
-            insertData(datasetName);
+            TestDataUtil.createIdOnlyDataset(datasetName);
+            TestDataUtil.upsertData(datasetName, 10);
         }
+        final long countBeforeFirstRecovery = TestDataUtil.getDatasetCount(datasetName);
         // do ungraceful shutdown to enforce recovery
         integrationUtil.deinit(false);
         integrationUtil.init(false);
-        validateRecovery(datasetName);
-
+        final long countAfterFirstRecovery = TestDataUtil.getDatasetCount(datasetName);
+        Assert.assertEquals(countBeforeFirstRecovery, countAfterFirstRecovery);
         // create more datasets after recovery
         numDatasets = 100;
         for (int i = 51; i <= numDatasets; i++) {
             datasetName = datasetNamePrefix + i;
-            testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
-            insertData(datasetName);
+            TestDataUtil.createIdOnlyDataset(datasetName);
+            TestDataUtil.upsertData(datasetName, 1);
         }
+        final long countBeforeSecondRecovery = TestDataUtil.getDatasetCount(datasetName);
         // do ungraceful shutdown to enforce recovery again
         integrationUtil.deinit(false);
         integrationUtil.init(false);
-        validateRecovery(datasetName);
+        final long countAfterSecondRecovery = TestDataUtil.getDatasetCount(datasetName);
+        Assert.assertEquals(countBeforeSecondRecovery, countAfterSecondRecovery);
     }
 
-    private void insertData(String datasetName) throws Exception {
-        for (int i = 0; i < numRecords; i++) {
-            testExecutor.executeSqlppUpdateOrDdl("UPSERT INTO " + datasetName + " ({\"id\": " + random.nextInt() + "})",
-                    TestCaseContext.OutputFormat.CLEAN_JSON);
-        }
-    }
-
-    private void validateRecovery(String datasetName) throws Exception {
-        final String query = "select value count(*) from `" + datasetName + "`;";
-        final InputStream inputStream = testExecutor
-                .executeQueryService(query, testExecutor.getEndpoint(Servlets.QUERY_SERVICE),
-                        TestCaseContext.OutputFormat.CLEAN_JSON);
-        final ObjectNode jsonNodes = OBJECT_MAPPER.readValue(inputStream, ObjectNode.class);
-        JsonNode result = jsonNodes.get("results");
-        // make sure there is result
-        Assert.assertEquals(1, result.size());
-        for (int i = 0; i < result.size(); i++) {
-            JsonNode json = result.get(i);
-            Assert.assertEquals(numRecords, json.asInt());
-        }
+    @Test
+    public void reoveryAfterRebalance() throws Exception {
+        String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        TestDataUtil.upsertData(datasetName, 10);
+        final long countBeforeRebalance = TestDataUtil.getDatasetCount(datasetName);
+        // rebalance dataset to single nc
+        TestDataUtil.rebalanceDataset(integrationUtil, MetadataBuiltinEntities.DEFAULT_DATAVERSE.getDataverseName(),
+                datasetName, new String[] { "asterix_nc2" });
+        // check data after rebalance
+        final long countAfterRebalance = TestDataUtil.getDatasetCount(datasetName);
+        Assert.assertEquals(countBeforeRebalance, countAfterRebalance);
+        // insert data after rebalance
+        TestDataUtil.upsertData(datasetName, 20);
+        final long countBeforeRecovery = TestDataUtil.getDatasetCount(datasetName);
+        // do ungraceful shutdown to enforce recovery
+        integrationUtil.deinit(false);
+        integrationUtil.init(false);
+        final long countAfterRecovery = TestDataUtil.getDatasetCount(datasetName);
+        Assert.assertEquals(countBeforeRecovery, countAfterRecovery);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
index 78b5cb2..48bbf00 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
@@ -59,6 +59,11 @@
     }
 
     @Override
+    public void setPath(String path) {
+        resource.setPath(path);
+    }
+
+    @Override
     public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
         return resource.createInstance(ncServiceCtx);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index 6ffa095..492a393 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -20,11 +20,13 @@
 
 import java.util.Set;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 public interface IReplicaResourcesManager {
 
     /**
      * @param partitions
      * @return the minimum LSN of all indexes that belong to {@code partitions}.
      */
-    public long getPartitionsMinLSN(Set<Integer> partitions);
+    long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
new file mode 100644
index 0000000..d05321e
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.nio.file.Paths;
+
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.storage.common.LocalResource;
+
+public class DatasetResourceReference extends ResourceReference {
+
+    private int datasetId;
+    private int partitionId;
+
+    private DatasetResourceReference() {
+        super();
+    }
+
+    public static DatasetResourceReference of(LocalResource localResource) {
+        return of(localResource, StorageConstants.VERSION);
+    }
+
+    public static DatasetResourceReference of(LocalResource localResource, int version) {
+        if (version < StorageConstants.REBALANCE_STORAGE_VERSION) {
+            // to support legacy storage migration
+            return parseLegacyPath(localResource);
+        }
+        return parse(localResource);
+    }
+
+    public int getDatasetId() {
+        return datasetId;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    private static DatasetResourceReference parse(LocalResource localResource) {
+        final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
+        final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
+        parse(datasetResourceReference, filePath);
+        assignIds(localResource, datasetResourceReference);
+        return datasetResourceReference;
+    }
+
+    private static DatasetResourceReference parseLegacyPath(LocalResource localResource) {
+        final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
+        final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
+        parseLegacyPath(datasetResourceReference, filePath);
+        assignIds(localResource, datasetResourceReference);
+        return datasetResourceReference;
+    }
+
+    private static void assignIds(LocalResource localResource, DatasetResourceReference lrr) {
+        final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource();
+        lrr.datasetId = dsResource.getDatasetId();
+        lrr.partitionId = dsResource.getPartition();
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
deleted file mode 100644
index ca6968f..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.storage;
-
-import java.io.File;
-
-import org.apache.asterix.common.utils.StoragePathUtil;
-
-/**
- * A holder class for an index file properties.
- */
-public class IndexFileProperties {
-
-    private final String fileName;
-    private final String idxName;
-    private final String dataverseName;
-    private final int partitionId;
-    private final int datasetId;
-
-    public IndexFileProperties(int partitionId, String dataverseName, String idxName, String fileName, int datasetId) {
-        this.partitionId = partitionId;
-        this.dataverseName = dataverseName;
-        this.idxName = idxName;
-        this.fileName = fileName;
-        this.datasetId = datasetId;
-    }
-
-    public String getFileName() {
-        return fileName;
-    }
-
-    public String getIdxName() {
-        return idxName;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public int getDatasetId() {
-        return datasetId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(StoragePathUtil.PARTITION_DIR_PREFIX + partitionId + File.separator);
-        sb.append(dataverseName + File.separator);
-        sb.append(idxName + File.separator);
-        sb.append(fileName);
-        sb.append(" [Dataset ID: " + datasetId + "]");
-        return sb.toString();
-    }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java
new file mode 100644
index 0000000..4d0f3dd
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+public class IndexPathElements {
+
+    private final String datasetName;
+    private final String indexName;
+    private final String rebalanceCount;
+
+    public IndexPathElements(String datasetName, String indexName, String rebalanceCount) {
+        this.datasetName = datasetName;
+        this.indexName = indexName;
+        this.rebalanceCount = rebalanceCount;
+    }
+
+    public String getDatasetName() {
+        return datasetName;
+    }
+
+    public String getIndexName() {
+        return indexName;
+    }
+
+    public String getRebalanceCount() {
+        return rebalanceCount;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
new file mode 100644
index 0000000..0d65067
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+
+public class ResourceReference {
+
+    protected String root;
+    protected String partition;
+    protected String dataverse;
+    protected String dataset;
+    protected String rebalance;
+    protected String index;
+    protected String name;
+
+    protected ResourceReference() {
+    }
+
+    public static ResourceReference of(String localResourcePath) {
+        ResourceReference lrr = new ResourceReference();
+        parse(lrr, localResourcePath);
+        return lrr;
+    }
+
+    public String getPartition() {
+        return partition;
+    }
+
+    public String getDataverse() {
+        return dataverse;
+    }
+
+    public String getDataset() {
+        return dataset;
+    }
+
+    public String getRebalance() {
+        return rebalance;
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Path getRelativePath() {
+        return Paths.get(root, partition, dataverse, dataset, rebalance, index);
+    }
+
+    protected static void parse(ResourceReference ref, String path) {
+        // format: root/partition/dataverse/dataset/rebalanceCount/index/fileName
+        final String[] tokens = path.split(File.separator);
+        if (tokens.length < 6) {
+            throw new IllegalStateException("Unrecognized path structure: " + path);
+        }
+        int offset = tokens.length;
+        ref.name = tokens[--offset];
+        ref.index = tokens[--offset];
+        ref.rebalance = tokens[--offset];
+        ref.dataset = tokens[--offset];
+        ref.dataverse = tokens[--offset];
+        ref.partition = tokens[--offset];
+        ref.root = tokens[--offset];
+    }
+
+    protected static void parseLegacyPath(ResourceReference ref, String path) {
+        // old format: root/partition/dataverse/datasetName_idx_IndexName/fileName
+        final String[] tokens = path.split(File.separator);
+        if (tokens.length < 4) {
+            throw new IllegalStateException("Unrecognized legacy path structure: " + path);
+        }
+        int offset = tokens.length;
+        ref.name = tokens[--offset];
+        // split combined dataset/index name
+        final String[] indexTokens = tokens[--offset].split(StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR);
+        if (indexTokens.length != 2) {
+            throw new IllegalStateException("Unrecognized legacy path structure: " + path);
+        }
+        ref.dataset = indexTokens[0];
+        ref.index = indexTokens[1];
+        ref.dataverse = tokens[--offset];
+        ref.partition = tokens[--offset];
+        ref.root = tokens[--offset];
+        ref.rebalance = String.valueOf(0);
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 49d64d6..48769d4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -24,13 +24,25 @@
  * A static class that stores storage constants
  */
 public class StorageConstants {
-    public static final String METADATA_ROOT = "root_metadata";
-    /** The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). */
-    private static final int LOCAL_STORAGE_VERSION = 1;
 
-    /** The storage version of AsterixDB stack. */
+    public static final String METADATA_ROOT = "root_metadata";
+    public static final String METADATA_FILE_NAME = ".metadata";
+
+    /**
+     * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
+     */
+    private static final int LOCAL_STORAGE_VERSION = 2;
+
+    /**
+     * The storage version of AsterixDB stack.
+     */
     public static final int VERSION = LOCAL_STORAGE_VERSION + ITreeIndexFrame.Constants.VERSION;
 
+    /**
+     * The storage version in which the rebalance storage structure was introduced
+     */
+    public static final int REBALANCE_STORAGE_VERSION = 8;
+
     private StorageConstants() {
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 027f72c..07b9359 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,8 +19,12 @@
 package org.apache.asterix.common.utils;
 
 import java.io.File;
+import java.nio.file.Paths;
+import java.util.function.Function;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.storage.IndexPathElements;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -36,6 +40,7 @@
     private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName());
     public static final String PARTITION_DIR_PREFIX = "partition_";
     public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+    public static Function<IndexPathElements, String> indexPathProvider;
 
     private StoragePathUtil() {
     }
@@ -69,8 +74,10 @@
     }
 
     private static String prepareFullIndexName(String datasetName, String idxName, long rebalanceCount) {
-        return (rebalanceCount == 0 ? "" : rebalanceCount + File.separator) + datasetName + DATASET_INDEX_NAME_SEPARATOR
-                + idxName;
+        if (indexPathProvider != null) {
+            return indexPathProvider.apply(new IndexPathElements(datasetName, idxName, String.valueOf(rebalanceCount)));
+        }
+        return datasetName + File.separator + rebalanceCount + File.separator + idxName;
     }
 
     public static int getPartitionNumFromName(String name) {
@@ -88,10 +95,7 @@
      * @return the file relative path starting from the partition directory
      */
     public static String getIndexFileRelativePath(String fileAbsolutePath) {
-        String[] tokens = fileAbsolutePath.split(File.separator);
-        //partition/dataverse/idx/fileName
-        return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
-                + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+        return ResourceReference.of(fileAbsolutePath).getRelativePath().toString();
     }
 
     /**
@@ -136,7 +140,6 @@
      * @return The index name
      */
     public static String getIndexNameFromPath(String path) {
-        int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR);
-        return idx != -1 ? path.substring(idx + DATASET_INDEX_NAME_SEPARATOR.length()) : path;
+        return Paths.get(path).getFileName().toString();
     }
 }
diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml
index 3138806..f209aae 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -43,11 +43,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-metadata</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 9d8c351..3143284 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -54,7 +54,7 @@
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
@@ -392,7 +392,7 @@
                 //start sending files
                 for (String filePath : filesList) {
                     // Send only files of datasets that are replciated.
-                    IndexFileProperties indexFileRef = localResourceRep.getIndexFileRef(filePath);
+                    DatasetResourceReference indexFileRef = localResourceRep.getLocalResourceReference(filePath);
                     if (!repStrategy.isMatch(indexFileRef.getDatasetId())) {
                         continue;
                     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index b0aa0fb..48c7083 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -62,7 +62,7 @@
 import org.apache.asterix.common.replication.Replica.ReplicaState;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.replication.ReplicationJob;
-import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -280,7 +280,7 @@
             //all of the job's files belong to a single storage partition.
             //get any of them to determine the partition from the file path.
             String jobFile = job.getJobFiles().iterator().next();
-            IndexFileProperties indexFileRef = localResourceRepo.getIndexFileRef(jobFile);
+            DatasetResourceReference indexFileRef = localResourceRepo.getLocalResourceReference(jobFile);
             if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
                 return;
             }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index a8b15d2..7ca6f2f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -23,9 +23,11 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Paths;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.replication.logging.TxnLogUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
@@ -93,18 +95,16 @@
         return lsmCompProp;
     }
 
-    public String getMaskPath(ReplicaResourcesManager resourceManager) {
+    public String getMaskPath(ReplicaResourcesManager resourceManager) throws HyracksDataException {
         if (maskPath == null) {
             LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
-            //split the index file path to get the LSM component file name
-            afp.splitFileName();
             maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
                     + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
         }
         return maskPath;
     }
 
-    public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) {
+    public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) throws HyracksDataException {
         if (replicaPath == null) {
             LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
             replicaPath = resourceManager.getIndexPath(afp);
@@ -118,23 +118,10 @@
      * @return a unique id based on the timestamp of the component
      */
     public static String getLSMComponentID(String filePath) {
-        String[] tokens = filePath.split(File.separator);
-
-        int arraySize = tokens.length;
-        String fileName = tokens[arraySize - 1];
-        String idxName = tokens[arraySize - 2];
-        String dataverse = tokens[arraySize - 3];
-        String partitionName = tokens[arraySize - 4];
-
-        StringBuilder componentId = new StringBuilder();
-        componentId.append(partitionName);
-        componentId.append(File.separator);
-        componentId.append(dataverse);
-        componentId.append(File.separator);
-        componentId.append(idxName);
-        componentId.append(File.separator);
-        componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)));
-        return componentId.toString();
+        final ResourceReference ref = ResourceReference.of(filePath);
+        final String fileUniqueTimestamp =
+                ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+        return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString();
     }
 
     public String getComponentId() {
@@ -149,16 +136,8 @@
         return nodeId;
     }
 
-    public int getNumberOfFiles() {
-        return numberOfFiles.get();
-    }
-
     public int markFileComplete() {
         return numberOfFiles.decrementAndGet();
-    }
-
-    public void setNumberOfFiles(AtomicInteger numberOfFiles) {
-        this.numberOfFiles = numberOfFiles;
     }
 
     public Long getReplicaLSN() {
@@ -171,10 +150,6 @@
 
     public LSMOperationType getOpType() {
         return opType;
-    }
-
-    public void setOpType(LSMOperationType opType) {
-        this.opType = opType;
     }
 
     public String getNodeUniqueLSN() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index eb9e82d..f2747fe 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -20,24 +20,18 @@
 
 import java.io.DataInput;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-
-import org.apache.asterix.common.utils.StoragePathUtil;
+import java.nio.file.Paths;
 
 public class LSMIndexFileProperties {
 
-    private String fileName;
     private long fileSize;
     private String nodeId;
-    private String dataverse;
-    private String idxName;
     private boolean lsmComponentFile;
     private String filePath;
     private boolean requiresAck = false;
     private long LSNByteOffset;
-    private int partition;
 
     public LSMIndexFileProperties() {
     }
@@ -59,15 +53,6 @@
         this.lsmComponentFile = lsmComponentFile;
         this.LSNByteOffset = LSNByteOffset;
         this.requiresAck = requiresAck;
-    }
-
-    public void splitFileName() {
-        String[] tokens = filePath.split(File.separator);
-        int arraySize = tokens.length;
-        this.fileName = tokens[arraySize - 1];
-        this.idxName = tokens[arraySize - 2];
-        this.dataverse = tokens[arraySize - 3];
-        this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]);
     }
 
     public void serialize(OutputStream out) throws IOException {
@@ -100,24 +85,8 @@
         return fileSize;
     }
 
-    public String getFileName() {
-        return fileName;
-    }
-
     public String getNodeId() {
         return nodeId;
-    }
-
-    public String getDataverse() {
-        return dataverse;
-    }
-
-    public void setDataverse(String dataverse) {
-        this.dataverse = dataverse;
-    }
-
-    public String getIdxName() {
-        return idxName;
     }
 
     public boolean isLSMComponentFile() {
@@ -128,25 +97,22 @@
         return requiresAck;
     }
 
+    public String getFileName() {
+        return Paths.get(filePath).toFile().getName();
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append("File Name: " + fileName + "  ");
+        sb.append("File Path: " + filePath + "  ");
         sb.append("File Size: " + fileSize + "  ");
         sb.append("Node ID: " + nodeId + "  ");
-        sb.append("Partition: " + partition + "  ");
-        sb.append("IDX Name: " + idxName + "  ");
         sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
-        sb.append("Dataverse: " + dataverse);
         sb.append("LSN Byte Offset: " + LSNByteOffset);
         return sb.toString();
     }
 
     public long getLSNByteOffset() {
         return LSNByteOffset;
-    }
-
-    public int getPartition() {
-        return partition;
     }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index cf8e001..7eea4a4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,14 +38,14 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 
@@ -63,7 +63,7 @@
         nodePartitions = metadataProperties.getNodePartitions();
     }
 
-    public void deleteIndexFile(LSMIndexFileProperties afp) {
+    public void deleteIndexFile(LSMIndexFileProperties afp) throws HyracksDataException {
         String indexPath = getIndexPath(afp);
         if (indexPath != null) {
             if (afp.isLSMComponentFile()) {
@@ -78,20 +78,12 @@
         }
     }
 
-    public String getIndexPath(LSMIndexFileProperties fileProperties) {
-        fileProperties.splitFileName();
-        //get partition path in this node
-        String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition());
-        //get index path
-        String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(),
-                fileProperties.getDataverse(), fileProperties.getIdxName());
-
-        Path path = Paths.get(indexPath);
-        if (!Files.exists(path)) {
-            File indexFolder = new File(indexPath);
-            indexFolder.mkdirs();
+    public String getIndexPath(LSMIndexFileProperties fileProperties) throws HyracksDataException {
+        final FileReference indexPath = localRepository.getIndexPath(Paths.get(fileProperties.getFilePath()));
+        if (!indexPath.getFile().exists()) {
+            indexPath.getFile().mkdirs();
         }
-        return indexPath;
+        return indexPath.toString();
     }
 
     public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
@@ -123,21 +115,21 @@
         updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap);
     }
 
-    public Set<File> getReplicaIndexes(String replicaId) {
+    public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
         Set<File> remoteIndexesPaths = new HashSet<File>();
         ClusterPartition[] partitions = nodePartitions.get(replicaId);
         for (ClusterPartition partition : partitions) {
-            remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId()));
+            remoteIndexesPaths.addAll(localRepository.getPartitionIndexes(partition.getPartitionId()));
         }
         return remoteIndexesPaths;
     }
 
     @Override
-    public long getPartitionsMinLSN(Set<Integer> partitions) {
+    public long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException {
         long minRemoteLSN = Long.MAX_VALUE;
         for (Integer partition : partitions) {
             //for every index in replica
-            Set<File> remoteIndexes = getPartitionIndexes(partition);
+            Set<File> remoteIndexes = localRepository.getPartitionIndexes(partition);
             for (File indexFolder : remoteIndexes) {
                 //read LSN map
                 try {
@@ -164,7 +156,7 @@
             for (File indexFolder : remoteIndexes) {
                 if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
                     File localResource = new File(
-                            indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME);
+                            indexFolder + File.separator + StorageConstants.METADATA_FILE_NAME);
                     LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
                     laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath());
                 }
@@ -190,7 +182,12 @@
 
     public void cleanInvalidLSMComponents(String replicaId) {
         //for every index in replica
-        Set<File> remoteIndexes = getReplicaIndexes(replicaId);
+        Set<File> remoteIndexes = null;
+        try {
+            remoteIndexes = getReplicaIndexes(replicaId);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
         for (File remoteIndexFile : remoteIndexes) {
             //search for any mask
             File[] masks = remoteIndexFile.listFiles(LSM_COMPONENTS_MASKS_FILTER);
@@ -241,41 +238,11 @@
 
     /**
      * @param partition
-     * @return Set of file references to each index in the partition
-     */
-    public Set<File> getPartitionIndexes(int partition) {
-        Set<File> partitionIndexes = new HashSet<File>();
-        String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
-        String partitionStoragePath = localRepository.getPartitionPath(partition)
-                + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition);
-        File partitionRoot = new File(partitionStoragePath);
-        if (partitionRoot.exists() && partitionRoot.isDirectory()) {
-            File[] dataverseFileList = partitionRoot.listFiles();
-            if (dataverseFileList != null) {
-                for (File dataverseFile : dataverseFileList) {
-                    if (dataverseFile.isDirectory()) {
-                        File[] indexFileList = dataverseFile.listFiles();
-                        if (indexFileList != null) {
-                            for (File indexFile : indexFileList) {
-                                if (indexFile.isDirectory()) {
-                                    partitionIndexes.add(indexFile);
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return partitionIndexes;
-    }
-
-    /**
-     * @param partition
      * @return Absolute paths to all partition files
      */
-    public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) {
+    public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException {
         List<String> partitionFiles = new ArrayList<String>();
-        Set<File> partitionIndexes = getPartitionIndexes(partition);
+        Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition);
         for (File indexDir : partitionIndexes) {
             if (indexDir.isDirectory()) {
                 File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
@@ -284,8 +251,7 @@
                         if (!relativePath) {
                             partitionFiles.add(file.getAbsolutePath());
                         } else {
-                            partitionFiles.add(
-                                    StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
+                            partitionFiles.add(StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
                         }
                     }
                 }
@@ -311,7 +277,7 @@
     private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() {
         @Override
         public boolean accept(File dir, String name) {
-            return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith(".");
+            return name.equalsIgnoreCase(StorageConstants.METADATA_FILE_NAME) || !name.startsWith(".");
         }
     };
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index db3647e..587e8c1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -23,22 +23,26 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.MetadataProperties;
@@ -47,7 +51,8 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
-import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
@@ -68,15 +73,13 @@
 
 public class PersistentLocalResourceRepository implements ILocalResourceRepository {
 
-    // Public constants
-    public static final String METADATA_FILE_NAME = ".metadata";
+    public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME);
     // Private constants
     private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
     private static final String STORAGE_METADATA_DIRECTORY = StorageConstants.METADATA_ROOT;
     private static final String STORAGE_METADATA_FILE_NAME_PREFIX = "." + StorageConstants.METADATA_ROOT;
     private static final int MAX_CACHED_RESOURCES = 1000;
-    private static final FilenameFilter METADATA_FILES_FILTER =
-            (File dir, String name) -> name.equalsIgnoreCase(METADATA_FILE_NAME);
+
     // Finals
     private final IIOManager ioManager;
     private final String[] mountPoints;
@@ -157,8 +160,9 @@
             //make dirs for the storage metadata file
             boolean success = storageMetadataDir.mkdirs();
             if (!success) {
-                throw HyracksDataException.create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED,
-                        getClass().getSimpleName(), storageMetadataDir.getAbsolutePath());
+                throw HyracksDataException
+                        .create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED, getClass().getSimpleName(),
+                                storageMetadataDir.getAbsolutePath());
             }
             LOGGER.log(Level.INFO,
                     "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
@@ -198,8 +202,8 @@
             throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
         }
 
-        try (FileOutputStream fos = new FileOutputStream(resourceFile.getFile());
-                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
+        try (FileOutputStream fos = new FileOutputStream(
+                resourceFile.getFile()); ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(resource);
             oosToFos.flush();
         } catch (IOException e) {
@@ -226,27 +230,23 @@
             } finally {
                 // Regardless of successfully deleted or not, the operation should be replicated.
                 //if replication enabled, delete resource from remote replicas
-                if (isReplicationEnabled
-                        && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
+                if (isReplicationEnabled && !resourceFile.getFile().getName()
+                        .startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
                     createReplicationJob(ReplicationOperation.DELETE, resourceFile);
                 }
             }
         } else {
-            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
-                    relativePath);
+            throw HyracksDataException
+                    .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath);
         }
     }
 
     private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
             throws HyracksDataException {
-        String fileName = resourcePath + File.separator + METADATA_FILE_NAME;
+        String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME;
         return ioManager.resolve(fileName);
     }
-
-    public Map<Long, LocalResource> loadAndGetAllResources() throws IOException {
-        //TODO During recovery, the memory usage currently is proportional to the number of resources available.
-        //This could be fixed by traversing all resources on disk until the required resource is found.
-        LOGGER.log(Level.INFO, "Loading all resources");
+    public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
         for (int i = 0; i < mountPoints.length; i++) {
             File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i);
@@ -254,109 +254,47 @@
                 LOGGER.log(Level.INFO, "Getting storage root dir returned null. Returning");
                 continue;
             }
+            // storage/partition/dataverse/dataset/rebalance/idx
             LOGGER.log(Level.INFO, "Getting storage root dir returned " + storageRootDir.getAbsolutePath());
-            //load all local resources.
-            File[] partitions = storageRootDir.listFiles();
-            LOGGER.log(Level.INFO, "Number of partitions found = " + partitions.length);
-            for (File partition : partitions) {
-                File[] dataverseFileList = partition.listFiles();
-                LOGGER.log(Level.INFO, "Reading partition = " + partition.getName() + ". Number of dataverses found: "
-                        + dataverseFileList.length);
-                if (dataverseFileList != null) {
-                    for (File dataverseFile : dataverseFileList) {
-                        loadDataverse(dataverseFile, resourcesMap);
+            try {
+                try (Stream<Path> stream = Files.find(storageRootDir.toPath(), 6,
+                        (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) {
+                    final List<File> resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList());
+                    for (File file : resourceMetadataFiles) {
+                        final LocalResource localResource = PersistentLocalResourceRepository.readLocalResource(file);
+                        if (filter.test(localResource)) {
+                            resourcesMap.put(localResource.getId(), localResource);
+                        }
                     }
                 }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
             }
         }
         return resourcesMap;
+
     }
 
-    private void loadDataverse(File dataverseFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
-        LOGGER.log(Level.INFO, "Loading dataverse:" + dataverseFile.getName());
-        if (dataverseFile.isDirectory()) {
-            File[] indexFileList = dataverseFile.listFiles();
-            if (indexFileList != null) {
-                for (File indexFile : indexFileList) {
-                    loadIndex(indexFile, resourcesMap);
-                }
-            }
-        }
-    }
-
-    private void loadIndex(File indexFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
-        LOGGER.log(Level.INFO, "Loading index:" + indexFile.getName());
-        if (indexFile.isDirectory()) {
-            File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
-            if (metadataFiles != null) {
-                for (File metadataFile : metadataFiles) {
-                    LocalResource localResource = readLocalResource(metadataFile);
-                    LOGGER.log(Level.INFO, "Resource loaded " + localResource.getId() + ":" + localResource.getPath());
-                    resourcesMap.put(localResource.getId(), localResource);
-                }
-            }
-        }
+    public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
+        return getResources(p -> true);
     }
 
     @Override
     public long maxId() throws HyracksDataException {
-        long maxResourceId = 0;
-        for (int i = 0; i < mountPoints.length; i++) {
-            File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i);
-            if (storageRootDir == null) {
-                continue;
-            }
-
-            //load all local resources.
-            File[] partitions = storageRootDir.listFiles();
-            for (File partition : partitions) {
-                //traverse all local resources.
-                File[] dataverseFileList = partition.listFiles();
-                if (dataverseFileList != null) {
-                    for (File dataverseFile : dataverseFileList) {
-                        maxResourceId = getMaxResourceIdForDataverse(dataverseFile, maxResourceId);
-                    }
-                }
-            }
-        }
-        return maxResourceId;
-    }
-
-    private long getMaxResourceIdForDataverse(File dataverseFile, long maxSoFar) throws HyracksDataException {
-        long maxResourceId = maxSoFar;
-        if (dataverseFile.isDirectory()) {
-            File[] indexFileList = dataverseFile.listFiles();
-            if (indexFileList != null) {
-                for (File indexFile : indexFileList) {
-                    maxResourceId = getMaxResourceIdForIndex(indexFile, maxResourceId);
-                }
-            }
-        }
-        return maxResourceId;
-    }
-
-    private long getMaxResourceIdForIndex(File indexFile, long maxSoFar) throws HyracksDataException {
-        long maxResourceId = maxSoFar;
-        if (indexFile.isDirectory()) {
-            File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
-            if (metadataFiles != null) {
-                for (File metadataFile : metadataFiles) {
-                    LocalResource localResource = readLocalResource(metadataFile);
-                    maxResourceId = Math.max(maxResourceId, localResource.getId());
-                }
-            }
-        }
-        return maxResourceId;
+        final Map<Long, LocalResource> allResources = loadAndGetAllResources();
+        final Optional<Long> max = allResources.keySet().stream().max(Long::compare);
+        return max.isPresent() ? max.get() : 0;
     }
 
     private static String getFileName(String path) {
-        return path.endsWith(File.separator) ? (path + METADATA_FILE_NAME)
-                : (path + File.separator + METADATA_FILE_NAME);
+        return path.endsWith(File.separator) ?
+                (path + StorageConstants.METADATA_FILE_NAME) :
+                (path + File.separator + StorageConstants.METADATA_FILE_NAME);
     }
 
     public static LocalResource readLocalResource(File file) throws HyracksDataException {
-        try (FileInputStream fis = new FileInputStream(file);
-                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
+        try (FileInputStream fis = new FileInputStream(file); ObjectInputStream oisFromFis = new ObjectInputStream(
+                fis)) {
             LocalResource resource = (LocalResource) oisFromFis.readObject();
             if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
                 return resource;
@@ -425,8 +363,9 @@
      * @return A file reference to the storage metadata file.
      */
     private static FileReference getStorageMetadataFile(IIOManager ioManager, String nodeId, int ioDeviceId) {
-        String storageMetadataFileName = STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice"
-                + ioDeviceId + File.separator + STORAGE_METADATA_FILE_NAME_PREFIX;
+        String storageMetadataFileName =
+                STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId + File.separator
+                        + STORAGE_METADATA_FILE_NAME_PREFIX;
         return new FileReference(ioManager.getIODevices().get(ioDeviceId), storageMetadataFileName);
     }
 
@@ -483,10 +422,6 @@
         return Collections.unmodifiableSet(nodeInactivePartitions);
     }
 
-    public Set<Integer> getNodeOrignalPartitions() {
-        return Collections.unmodifiableSet(nodeOriginalPartitions);
-    }
-
     public synchronized void addActivePartition(int partitonId) {
         nodeActivePartitions.add(partitonId);
         nodeInactivePartitions.remove(partitonId);
@@ -497,27 +432,27 @@
         nodeActivePartitions.remove(partitonId);
     }
 
-    private static String getLocalResourceRelativePath(String absolutePath) {
-        final String[] tokens = absolutePath.split(File.separator);
-        // Format: storage_dir/partition/dataverse/idx
-        return tokens[tokens.length - 5] + File.separator + tokens[tokens.length - 4] + File.separator
-                + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2];
+    public DatasetResourceReference getLocalResourceReference(String absoluteFilePath) throws HyracksDataException {
+        //TODO pass relative path
+        final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
+        final LocalResource lr = get(localResourcePath);
+        return DatasetResourceReference.of(lr);
     }
 
-    public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws HyracksDataException {
-        //TODO pass relative path
-        final String[] tokens = absoluteFilePath.split(File.separator);
-        if (tokens.length < 5) {
-            throw new HyracksDataException("Invalid file format");
+    public Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
+        final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
+            DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+            return dsResource.getPartition() == partition;
+        });
+        Set<File> indexes = new HashSet<>();
+        for (LocalResource localResource : partitionResourcesMap.values()) {
+            indexes.add(ioManager.resolve(localResource.getPath()).getFile().getParentFile());
         }
-        String fileName = tokens[tokens.length - 1];
-        String index = tokens[tokens.length - 2];
-        String dataverse = tokens[tokens.length - 3];
-        String partition = tokens[tokens.length - 4];
-        int partitionId = StoragePathUtil.getPartitionNumFromName(partition);
-        String relativePath = getLocalResourceRelativePath(absoluteFilePath);
-        final LocalResource lr = get(relativePath);
-        int datasetId = lr == null ? -1 : ((DatasetLocalResource) lr.getResource()).getDatasetId();
-        return new IndexFileProperties(partitionId, dataverse, index, fileName, datasetId);
+        return indexes;
+    }
+
+    public FileReference getIndexPath(Path indexFile) throws HyracksDataException {
+        final ResourceReference ref = ResourceReference.of(indexFile.toString());
+        return ioManager.resolve(ref.getRelativePath().toString());
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
index 6ce543b..9f5b83c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
@@ -119,7 +119,7 @@
         return minFirstLSN;
     }
 
-    private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) {
+    private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) throws HyracksDataException {
         final IReplicaResourcesManager remoteResourcesManager =
                 txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
         final IApplicationContext propertiesProvider =
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java
index b9ad1b1..4cf145b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java
@@ -35,7 +35,7 @@
 public class BTreeResource implements IResource {
 
     private static final long serialVersionUID = 1L;
-    private final String path;
+    private String path;
     private final IStorageManager storageManager;
     private final ITypeTraits[] typeTraits;
     private final IBinaryComparatorFactory[] comparatorFactories;
@@ -63,4 +63,9 @@
     public String getPath() {
         return path;
     }
+
+    @Override
+    public void setPath(String path) {
+        this.path = path;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
index 6255c1d..b541750 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
@@ -18,13 +18,10 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.dataflow;
 
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
@@ -43,7 +40,7 @@
 public abstract class LsmResource implements IResource {
 
     private static final long serialVersionUID = 1L;
-    protected final String path;
+    protected String path;
     protected final IStorageManager storageManager;
     protected final ITypeTraits[] typeTraits;
     protected final IBinaryComparatorFactory[] cmpFactories;
@@ -88,14 +85,8 @@
         return path;
     }
 
-    public static int getIoDeviceNum(IIOManager ioManager, IODeviceHandle deviceHandle) {
-        List<IODeviceHandle> ioDevices = ioManager.getIODevices();
-        for (int i = 0; i < ioDevices.size(); i++) {
-            IODeviceHandle device = ioDevices.get(i);
-            if (device == deviceHandle) {
-                return i;
-            }
-        }
-        return -1;
+    @Override
+    public void setPath(String path) {
+        this.path = path;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java
index df4fbf2..f9eb844 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java
@@ -35,7 +35,7 @@
 public class RTreeResource implements IResource {
 
     private static final long serialVersionUID = 1L;
-    private final String path;
+    private String path;
     private final IStorageManager storageManager;
     private final ITypeTraits[] typeTraits;
     private final IBinaryComparatorFactory[] comparatorFactories;
@@ -68,4 +68,8 @@
         return path;
     }
 
+    @Override
+    public void setPath(String path) {
+        this.path = path;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java
index bb27023..7b9166d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java
@@ -28,4 +28,11 @@
     IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException;
 
     String getPath();
+
+    /**
+     * Sets the path of {@link IResource}.
+     *
+     * @param path
+     */
+    void setPath(String path);
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2181
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I0f968b9f493bf5aa2d49f503afe21f0d438bb7f0
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhubail@apache.org>

Mime
View raw message