asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [1/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance
Date Tue, 26 Jan 2016 23:31:24 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 5b068d2d0 -> 8fc8bf8b5


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
deleted file mode 100644
index 5e0c9b0..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-
-public class AsterixLSMIndexFileProperties {
-
-    private String fileName;
-    private long fileSize;
-    private String nodeId;
-    private String dataverse;
-    private int ioDeviceNum;
-    private String idxName;
-    private boolean lsmComponentFile;
-    private String filePath;
-    private boolean requiresAck = false;
-    private long LSNByteOffset;
-
-    public AsterixLSMIndexFileProperties() {
-    }
-
-    public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
-            long LSNByteOffset, boolean requiresAck) {
-        initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
-    }
-
-    public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
-        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
-                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
-    }
-
-    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
-            boolean requiresAck) {
-        this.filePath = filePath;
-        this.fileSize = fileSize;
-        this.nodeId = nodeId;
-        String[] tokens = filePath.split(File.separator);
-        int arraySize = tokens.length;
-        this.fileName = tokens[arraySize - 1];
-        this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]);
-        this.idxName = tokens[arraySize - 3];
-        this.dataverse = tokens[arraySize - 4];
-        this.lsmComponentFile = lsmComponentFile;
-        this.LSNByteOffset = LSNByteOffset;
-        this.requiresAck = requiresAck;
-    }
-
-    public static int getDeviceIONumFromName(String name) {
-        return Integer.parseInt(name.substring(IndexFileNameUtil.IO_DEVICE_NAME_PREFIX.length()));
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        dos.writeUTF(nodeId);
-        dos.writeUTF(filePath);
-        dos.writeLong(fileSize);
-        dos.writeBoolean(lsmComponentFile);
-        dos.writeLong(LSNByteOffset);
-        dos.writeBoolean(requiresAck);
-    }
-
-    public static AsterixLSMIndexFileProperties create(DataInput input) throws IOException {
-        String nodeId = input.readUTF();
-        String filePath = input.readUTF();
-        long fileSize = input.readLong();
-        boolean lsmComponentFile = input.readBoolean();
-        long LSNByteOffset = input.readLong();
-        boolean requiresAck = input.readBoolean();
-        AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties();
-        fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
-        return fileProp;
-    }
-
-    public String getFilePath() {
-        return filePath;
-    }
-
-    public void setFilePath(String filePath) {
-        this.filePath = filePath;
-    }
-
-    public long getFileSize() {
-        return fileSize;
-    }
-
-    public String getFileName() {
-        return fileName;
-    }
-
-    public void setFileName(String fileName) {
-        this.fileName = fileName;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public String getDataverse() {
-        return dataverse;
-    }
-
-    public void setDataverse(String dataverse) {
-        this.dataverse = dataverse;
-    }
-
-    public void setFileSize(long fileSize) {
-        this.fileSize = fileSize;
-    }
-
-    public int getIoDeviceNum() {
-        return ioDeviceNum;
-    }
-
-    public void setIoDeviceNum(int ioDevoceNum) {
-        this.ioDeviceNum = ioDevoceNum;
-    }
-
-    public String getIdxName() {
-        return idxName;
-    }
-
-    public void setIdxName(String idxName) {
-        this.idxName = idxName;
-    }
-
-    public boolean isLSMComponentFile() {
-        return lsmComponentFile;
-    }
-
-    public void setLsmComponentFile(boolean lsmComponentFile) {
-        this.lsmComponentFile = lsmComponentFile;
-    }
-
-    public boolean requiresAck() {
-        return requiresAck;
-    }
-
-    public void setRequiresAck(boolean requiresAck) {
-        this.requiresAck = requiresAck;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("File Name: " + fileName + "  ");
-        sb.append("File Size: " + fileSize + "  ");
-        sb.append("Node ID: " + nodeId + "  ");
-        sb.append("I/O Device: " + ioDeviceNum + "  ");
-        sb.append("IDX Name: " + idxName + "  ");
-        sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
-        sb.append("Dataverse: " + dataverse);
-        sb.append("LSN Byte Offset: " + LSNByteOffset);
-        return sb.toString();
-    }
-
-    public long getLSNByteOffset() {
-        return LSNByteOffset;
-    }
-
-    public void setLSNByteOffset(long lSNByteOffset) {
-        LSNByteOffset = lSNByteOffset;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 794a6e1..841a99f 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -47,9 +47,10 @@ public class LSMComponentProperties {
 
     public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) {
         this.nodeId = nodeId;
-        componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId);
+        componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]);
         numberOfFiles = new AtomicInteger(job.getJobFiles().size());
-        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
+        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
+                job.getLSMIndexOperationContext());
         opType = job.getLSMOpType();
     }
 
@@ -94,7 +95,7 @@ public class LSMComponentProperties {
 
     public String getMaskPath(ReplicaResourcesManager resourceManager) {
         if (maskPath == null) {
-            AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
+            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
             maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
                     + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
         }
@@ -103,9 +104,8 @@ public class LSMComponentProperties {
 
     public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) {
         if (replicaPath == null) {
-            AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
-            replicaPath = resourceManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(),
-                    afp.getIdxName());
+            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
+            replicaPath = resourceManager.getIndexPath(afp);
         }
         return replicaPath;
     }
@@ -113,27 +113,24 @@ public class LSMComponentProperties {
     /***
      * @param filePath
      *            any file of the LSM component
-     * @param nodeId
      * @return a unique id based on the timestamp of the component
      */
-    public static String getLSMComponentID(String filePath, String nodeId) {
+    public static String getLSMComponentID(String filePath) {
         String[] tokens = filePath.split(File.separator);
 
         int arraySize = tokens.length;
         String fileName = tokens[arraySize - 1];
-        String ioDevoceName = tokens[arraySize - 2];
-        String idxName = tokens[arraySize - 3];
-        String dataverse = tokens[arraySize - 4];
+        String idxName = tokens[arraySize - 2];
+        String dataverse = tokens[arraySize - 3];
+        String partitionName = tokens[arraySize - 4];
 
         StringBuilder componentId = new StringBuilder();
-        componentId.append(nodeId);
+        componentId.append(partitionName);
         componentId.append(File.separator);
         componentId.append(dataverse);
         componentId.append(File.separator);
         componentId.append(idxName);
         componentId.append(File.separator);
-        componentId.append(ioDevoceName);
-        componentId.append(File.separator);
         componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.SPLIT_STRING)));
         return componentId.toString();
     }
@@ -142,18 +139,10 @@ public class LSMComponentProperties {
         return componentId;
     }
 
-    public void setComponentId(String componentId) {
-        this.componentId = componentId;
-    }
-
     public long getOriginalLSN() {
         return originalLSN;
     }
 
-    public void setOriginalLSN(long lSN) {
-        originalLSN = lSN;
-    }
-
     public String getNodeId() {
         return nodeId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
new file mode 100644
index 0000000..890d3a2
--- /dev/null
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
+
+public class LSMIndexFileProperties {
+
+    private String fileName;
+    private long fileSize;
+    private String nodeId;
+    private String dataverse;
+    private String idxName;
+    private boolean lsmComponentFile;
+    private String filePath;
+    private boolean requiresAck = false;
+    private long LSNByteOffset;
+    private int partition;
+
+    public LSMIndexFileProperties() {
+    }
+
+    public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
+            long LSNByteOffset, boolean requiresAck) {
+        initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
+    }
+
+    public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
+        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
+                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+    }
+
+    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
+            boolean requiresAck) {
+        this.filePath = filePath;
+        this.fileSize = fileSize;
+        this.nodeId = nodeId;
+        this.lsmComponentFile = lsmComponentFile;
+        this.LSNByteOffset = LSNByteOffset;
+        this.requiresAck = requiresAck;
+    }
+
+    public void splitFileName() {
+        String[] tokens = filePath.split(File.separator);
+        int arraySize = tokens.length;
+        this.fileName = tokens[arraySize - 1];
+        this.idxName = tokens[arraySize - 2];
+        this.dataverse = tokens[arraySize - 3];
+        this.partition = getPartitonNumFromName(tokens[arraySize - 4]);
+    }
+
+    private static int getPartitonNumFromName(String name) {
+        return Integer.parseInt(name.substring(StoragePathUtil.PARTITION_DIR_PREFIX.length()));
+    }
+
+    public void serialize(OutputStream out) throws IOException {
+        DataOutputStream dos = new DataOutputStream(out);
+        dos.writeUTF(nodeId);
+        dos.writeUTF(filePath);
+        dos.writeLong(fileSize);
+        dos.writeBoolean(lsmComponentFile);
+        dos.writeLong(LSNByteOffset);
+        dos.writeBoolean(requiresAck);
+    }
+
+    public static LSMIndexFileProperties create(DataInput input) throws IOException {
+        String nodeId = input.readUTF();
+        String filePath = input.readUTF();
+        long fileSize = input.readLong();
+        boolean lsmComponentFile = input.readBoolean();
+        long LSNByteOffset = input.readLong();
+        boolean requiresAck = input.readBoolean();
+        LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
+                LSNByteOffset, requiresAck);
+        return fileProp;
+    }
+
+    public String getFilePath() {
+        return filePath;
+    }
+
+    public long getFileSize() {
+        return fileSize;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public String getDataverse() {
+        return dataverse;
+    }
+
+    public void setDataverse(String dataverse) {
+        this.dataverse = dataverse;
+    }
+
+    public String getIdxName() {
+        return idxName;
+    }
+
+    public boolean isLSMComponentFile() {
+        return lsmComponentFile;
+    }
+
+    public boolean requiresAck() {
+        return requiresAck;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("File Name: " + fileName + "  ");
+        sb.append("File Size: " + fileSize + "  ");
+        sb.append("Node ID: " + nodeId + "  ");
+        sb.append("Partition: " + partition + "  ");
+        sb.append("IDX Name: " + idxName + "  ");
+        sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
+        sb.append("Dataverse: " + dataverse);
+        sb.append("LSN Byte Offset: " + LSNByteOffset);
+        return sb.toString();
+    }
+
+    public long getLSNByteOffset() {
+        return LSNByteOffset;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 3e3043c..b9f7506 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,91 +38,65 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class ReplicaResourcesManager implements IReplicaResourcesManager {
     private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName());
-    private final String[] mountPoints;
-    private final int numIODevices;
-    private static final String REPLICA_FOLDER_SUFFIX = "_replica";
-    private final String replicationStorageFolder;
-    public final String localStorageFolder;
-    private final String localNodeID;
     public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
     private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
     public static final long REPLICA_INDEX_CREATION_LSN = -1;
     private final AtomicLong lastMinRemoteLSN;
+    private final PersistentLocalResourceRepository localRepository;
+    private final Map<String, ClusterPartition[]> nodePartitions;
 
-    public ReplicaResourcesManager(List<IODeviceHandle> devices, String localStorageFolder, String localNodeID,
-            String replicationStorageFolder) throws HyracksDataException {
-        numIODevices = devices.size();
-        this.mountPoints = new String[numIODevices];
-        for (int i = 0; i < numIODevices; i++) {
-            String mountPoint = devices.get(i).getPath().getPath();
-            File mountPointDir = new File(mountPoint);
-            if (!mountPointDir.exists()) {
-                throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
-            }
-            if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
-                mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
-            } else {
-                mountPoints[i] = new String(mountPoint);
-            }
-        }
-        this.localStorageFolder = localStorageFolder;
-        this.localNodeID = localNodeID;
-        this.replicationStorageFolder = replicationStorageFolder;
+    public ReplicaResourcesManager(ILocalResourceRepository localRepository,
+            AsterixMetadataProperties metadataProperties) {
+        this.localRepository = (PersistentLocalResourceRepository) localRepository;
+        nodePartitions = metadataProperties.getNodePartitions();
         lastMinRemoteLSN = new AtomicLong(-1);
     }
 
-    @Override
-    public String getLocalStorageFolder() {
-        return localStorageFolder;
-    }
-
-    private String getReplicaStorageFolder(String replicaId, int IODeviceNum) {
-        if (replicaId.equals(localNodeID)) {
-            return mountPoints[IODeviceNum] + localStorageFolder;
-        } else {
-            return mountPoints[IODeviceNum] + replicationStorageFolder + File.separator + replicaId
-                    + REPLICA_FOLDER_SUFFIX;
-        }
-    }
-
-    public void deleteRemoteFile(AsterixLSMIndexFileProperties afp) throws IOException {
-        String indexPath = getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(), afp.getIdxName());
+    public void deleteIndexFile(LSMIndexFileProperties afp) {
+        String indexPath = getIndexPath(afp);
         if (indexPath != null) {
             if (afp.isLSMComponentFile()) {
                 String backupFilePath = indexPath + File.separator + afp.getFileName();
 
                 //delete file
                 File destFile = new File(backupFilePath);
-                if (destFile.exists()) {
-                    destFile.delete();
-                }
+                FileUtils.deleteQuietly(destFile);
             } else {
                 //delete index files
                 indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator));
-                AsterixFilesUtil.deleteFolder(indexPath);
+                FileUtils.deleteQuietly(new File(indexPath));
             }
         }
     }
 
-    public String getIndexPath(String replicaId, int IODeviceNum, String dataverse, String dataset) {
-        //mounting point/backupNodeId_replica/Dataverse/Dataset/device_id_#/
-        String remoteIndexFolderPath = getReplicaStorageFolder(replicaId, IODeviceNum) + File.separator + dataverse
-                + File.separator + dataset + File.separator + IndexFileNameUtil.IO_DEVICE_NAME_PREFIX + IODeviceNum;
-        Path path = Paths.get(remoteIndexFolderPath);
+    public String getIndexPath(LSMIndexFileProperties fileProperties) {
+        fileProperties.splitFileName();
+        //get partition path in this node
+        String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition());
+        //get index path
+        String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(),
+                fileProperties.getDataverse(), fileProperties.getIdxName());
+
+        Path path = Paths.get(indexPath);
         if (!Files.exists(path)) {
-            File indexFolder = new File(remoteIndexFolderPath);
+            File indexFolder = new File(indexPath);
             indexFolder.mkdirs();
         }
-        return remoteIndexFolderPath;
+        return indexPath;
     }
 
     public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
@@ -144,14 +118,10 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         //remove mask to mark component as valid
         String maskPath = lsmComponentProperties.getMaskPath(this);
         Path path = Paths.get(maskPath);
-
-        if (Files.exists(path)) {
-            File maskFile = new File(maskPath);
-            maskFile.delete();
-        }
+        Files.deleteIfExists(path);
 
         //add component LSN to the index LSNs map
-        HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
+        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
         lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN());
 
         //update map on disk
@@ -159,73 +129,11 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
 
     }
 
-    public List<String> getResourcesForReplica(String nodeId) throws HyracksDataException {
-        List<String> resourcesList = new ArrayList<String>();
-        String rootFolder;
-        for (int i = 0; i < numIODevices; i++) {
-            rootFolder = getReplicaStorageFolder(nodeId, i);
-            File rootDirFile = new File(rootFolder);
-            if (!rootDirFile.exists()) {
-                continue;
-            }
-
-            File[] dataverseFileList = rootDirFile.listFiles();
-            for (File dataverseFile : dataverseFileList) {
-                if (dataverseFile.isDirectory()) {
-                    File[] indexFileList = dataverseFile.listFiles();
-                    if (indexFileList != null) {
-                        for (File indexFile : indexFileList) {
-                            if (indexFile.isDirectory()) {
-                                File[] ioDevicesList = indexFile.listFiles();
-                                if (ioDevicesList != null) {
-                                    for (File ioDeviceFile : ioDevicesList) {
-                                        if (ioDeviceFile.isDirectory()) {
-                                            File[] metadataFiles = ioDeviceFile.listFiles(LSM_INDEX_FILES_FILTER);
-                                            if (metadataFiles != null) {
-                                                for (File metadataFile : metadataFiles) {
-                                                    resourcesList.add(metadataFile.getAbsolutePath());
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return resourcesList;
-    }
-
-    public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
+    public Set<File> getReplicaIndexes(String replicaId) {
         Set<File> remoteIndexesPaths = new HashSet<File>();
-        for (int i = 0; i < numIODevices; i++) {
-            String rootReplicaFolder = getReplicaStorageFolder(replicaId, i);
-            File rootDirFile = new File(rootReplicaFolder);
-            if (!rootDirFile.exists()) {
-                continue;
-            }
-            File[] dataverseFileList = rootDirFile.listFiles();
-            for (File dataverseFile : dataverseFileList) {
-                if (dataverseFile.isDirectory()) {
-                    File[] indexFileList = dataverseFile.listFiles();
-                    if (indexFileList != null) {
-                        for (File indexFile : indexFileList) {
-                            if (indexFile.isDirectory()) {
-                                File[] ioDevicesList = indexFile.listFiles();
-                                if (ioDevicesList != null) {
-                                    for (File ioDeviceFile : ioDevicesList) {
-                                        if (ioDeviceFile.isDirectory()) {
-                                            remoteIndexesPaths.add(ioDeviceFile);
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
+        ClusterPartition[] partitions = nodePartitions.get(replicaId);
+        for (ClusterPartition partition : partitions) {
+            remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId()));
         }
         return remoteIndexesPaths;
     }
@@ -237,41 +145,60 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         }
         long minRemoteLSN = Long.MAX_VALUE;
         for (String replica : replicaIds) {
-            try {
-                //for every index in replica
-                Set<File> remoteIndexes = getReplicaIndexes(replica);
-                for (File indexFolder : remoteIndexes) {
-                    //read LSN map
-                    try {
-                        //get max LSN per index
-                        long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
-
-                        //get min of all maximums
-                        minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
-                    } catch (IOException e) {
-                        LOGGER.log(Level.INFO, indexFolder.getAbsolutePath() + " Couldn't read LSN map for index "
-                                + indexFolder);
-                        continue;
-                    }
+            //for every index in replica
+            Set<File> remoteIndexes = getReplicaIndexes(replica);
+            for (File indexFolder : remoteIndexes) {
+                //read LSN map
+                try {
+                    //get max LSN per index
+                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+
+                    //get min of all maximums
+                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                } catch (IOException e) {
+                    LOGGER.log(Level.INFO,
+                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+                    continue;
                 }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
             }
         }
         lastMinRemoteLSN.set(minRemoteLSN);
         return minRemoteLSN;
     }
 
-    public HashMap<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
-            throws IOException {
-        HashMap<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
+    @Override
+    public long getPartitionsMinLSN(Integer[] partitions) {
+        long minRemoteLSN = Long.MAX_VALUE;
+        for (Integer partition : partitions) {
+            //for every index in replica
+            Set<File> remoteIndexes = getPartitionIndexes(partition);
+            for (File indexFolder : remoteIndexes) {
+                //read LSN map
+                try {
+                    //get max LSN per index
+                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+
+                    //get min of all maximums
+                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                } catch (IOException e) {
+                    LOGGER.log(Level.INFO,
+                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+                    continue;
+                }
+            }
+        }
+        return minRemoteLSN;
+    }
+
+    public Map<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException {
+        Map<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
         try {
             //for every index in replica
             Set<File> remoteIndexes = getReplicaIndexes(replicaId);
             for (File indexFolder : remoteIndexes) {
                 if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
-                    File localResource = new File(indexFolder + File.separator
-                            + PersistentLocalResourceRepository.METADATA_FILE_NAME);
+                    File localResource = new File(
+                            indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME);
                     LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
                     laggingReplicaIndexes.put(resource.getResourceId(), indexFolder.getAbsolutePath());
                 }
@@ -286,7 +213,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
     private long getReplicaIndexMaxLSN(File indexFolder) throws IOException {
         long remoteIndexMaxLSN = 0;
         //get max LSN per index
-        HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
+        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
         if (lsnMap != null) {
             for (Long lsn : lsnMap.values()) {
                 remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
@@ -296,7 +223,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         return remoteIndexMaxLSN;
     }
 
-    public void cleanInvalidLSMComponents(String replicaId) throws HyracksDataException {
+    public void cleanInvalidLSMComponents(String replicaId) {
         //for every index in replica
         Set<File> remoteIndexes = getReplicaIndexes(replicaId);
         for (File remoteIndexFile : remoteIndexes) {
@@ -312,7 +239,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         }
     }
 
-    private void deleteLSMComponentFilesForMask(File maskFile) {
+    private static void deleteLSMComponentFilesForMask(File maskFile) {
         String lsmComponentTimeStamp = maskFile.getName().substring(0,
                 maskFile.getName().length() - LSM_COMPONENT_MASK_SUFFIX.length());
         File indexFolder = maskFile.getParentFile();
@@ -325,78 +252,92 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public synchronized HashMap<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
-        FileInputStream fis = null;
-        ObjectInputStream oisFromFis = null;
-        try {
-            fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-            oisFromFis = new ObjectInputStream(fis);
+    @SuppressWarnings({ "unchecked" })
+    public synchronized Map<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
+        try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
             Map<Long, Long> lsnMap = null;
             try {
                 lsnMap = (Map<Long, Long>) oisFromFis.readObject();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
-            return (HashMap<Long, Long>) lsnMap;
-        } finally {
-            if (oisFromFis != null) {
-                oisFromFis.close();
-            }
-            if (oisFromFis == null && fis != null) {
-                fis.close();
-            }
+            return lsnMap;
         }
     }
 
-    public synchronized void updateReplicaIndexLSNMap(String indexPath, HashMap<Long, Long> lsnMap) throws IOException {
-        FileOutputStream fos = null;
-        ObjectOutputStream oosToFos = null;
-        try {
-            fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-            oosToFos = new ObjectOutputStream(fos);
+    public synchronized void updateReplicaIndexLSNMap(String indexPath, Map<Long, Long> lsnMap) throws IOException {
+        try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(lsnMap);
             oosToFos.flush();
             lastMinRemoteLSN.set(-1);
-        } finally {
-            if (oosToFos != null) {
-                oosToFos.close();
+        }
+    }
+
+    /**
+     * @param partition
+     * @return Set of file references to each index in the partition
+     */
+    public Set<File> getPartitionIndexes(int partition) {
+        Set<File> partitionIndexes = new HashSet<File>();
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        String partitionStoragePath = localRepository.getPartitionPath(partition)
+                + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition);
+        File partitionRoot = new File(partitionStoragePath);
+        if (partitionRoot.exists() && partitionRoot.isDirectory()) {
+            File[] dataverseFileList = partitionRoot.listFiles();
+            if (dataverseFileList != null) {
+                for (File dataverseFile : dataverseFileList) {
+                    if (dataverseFile.isDirectory()) {
+                        File[] indexFileList = dataverseFile.listFiles();
+                        if (indexFileList != null) {
+                            for (File indexFile : indexFileList) {
+                                partitionIndexes.add(indexFile);
+                            }
+                        }
+                    }
+                }
             }
-            if (oosToFos == null && fos != null) {
-                fos.close();
+        }
+        return partitionIndexes;
+    }
+
+    /**
+     * @param partition
+     * @return Absolute paths to all partition files
+     */
+    public List<String> getPartitionIndexesFiles(int partition) {
+        List<String> partitionFiles = new ArrayList<String>();
+        Set<File> partitionIndexes = getPartitionIndexes(partition);
+        for (File indexDir : partitionIndexes) {
+            if (indexDir.isDirectory()) {
+                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+                if (indexFiles != null) {
+                    for (File file : indexFiles) {
+                        partitionFiles.add(file.getAbsolutePath());
+                    }
+                }
             }
         }
+        return partitionFiles;
     }
 
     private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
-                return true;
-            } else {
-                return false;
-            }
+            return name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (!name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
-                return true;
-            } else {
-                return false;
-            }
+            return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME)) {
-                return true;
-            } else if (!name.startsWith(".")) {
-                return true;
-            } else {
-                return false;
-            }
+            return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith(".");
         }
     };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/data/fbu.adm
----------------------------------------------------------------------
diff --git a/asterix-replication/src/test/resources/data/fbu.adm b/asterix-replication/src/test/resources/data/fbu.adm
new file mode 100644
index 0000000..7e99ea4
--- /dev/null
+++ b/asterix-replication/src/test/resources/data/fbu.adm
@@ -0,0 +1,10 @@
+{"id":1,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]}
+{"id":2,"alias":"Isbel","name":"IsbelDull","user-since":datetime("2011-01-22T10:10:00"),"friend-ids":{{1,4}},"employment":[{"organization-name":"Hexviafind","start-date":date("2010-04-27")}]}
+{"id":3,"alias":"Emory","name":"EmoryUnk","user-since":datetime("2012-07-10T10:10:00"),"friend-ids":{{1,5,8,9}},"employment":[{"organization-name":"geomedia","start-date":date("2010-06-17"),"end-date":date("2010-01-26")}]}
+{"id":4,"alias":"Nicholas","name":"NicholasStroh","user-since":datetime("2010-12-27T10:10:00"),"friend-ids":{{2}},"employment":[{"organization-name":"Zamcorporation","start-date":date("2010-06-08")}]}
+{"id":5,"alias":"Von","name":"VonKemble","user-since":datetime("2010-01-05T10:10:00"),"friend-ids":{{3,6,10}},"employment":[{"organization-name":"Kongreen","start-date":date("2010-11-27")}]}
+{"id":6,"alias":"Willis","name":"WillisWynne","user-since":datetime("2005-01-17T10:10:00"),"friend-ids":{{1,3,7}},"employment":[{"organization-name":"jaydax","start-date":date("2009-05-15")}]}
+{"id":7,"alias":"Suzanna","name":"SuzannaTillson","user-since":datetime("2012-08-07T10:10:00"),"friend-ids":{{6}},"employment":[{"organization-name":"Labzatron","start-date":date("2011-04-19")}]}
+{"id":8,"alias":"Nila","name":"NilaMilliron","user-since":datetime("2008-01-01T10:10:00"),"friend-ids":{{3}},"employment":[{"organization-name":"Plexlane","start-date":date("2010-02-28")}]}
+{"id":9,"alias":"Woodrow","name":"WoodrowNehling","user-since":datetime("2005-09-20T10:10:00"),"friend-ids":{{3,10}},"employment":[{"organization-name":"Zuncan","start-date":date("2003-04-22"),"end-date":date("2009-12-13")}]}
+{"id":10,"alias":"Bram","name":"BramHatch","user-since":datetime("2010-10-16T10:10:00"),"friend-ids":{{1,5,9}},"employment":[{"organization-name":"physcane","start-date":date("2007-06-05"),"end-date":date("2011-11-05")}]}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/scripts/delete_storage.sh
----------------------------------------------------------------------
diff --git a/asterix-replication/src/test/resources/scripts/delete_storage.sh b/asterix-replication/src/test/resources/scripts/delete_storage.sh
new file mode 100755
index 0000000..129030b
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/delete_storage.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+rm -rf ~/asterix/*

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
----------------------------------------------------------------------
diff --git a/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
new file mode 100755
index 0000000..2582713
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+jps | awk '{if ($2 == "NCDriver" || $2 == "CCDriver") print $1;}' | xargs -n 1 kill -9

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 52fd806..9f59148 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -32,9 +32,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.AsterixReplicationJob;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.commons.io.FileUtils;
@@ -63,10 +66,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     private IReplicationManager replicationManager;
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
+    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
 
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
+            AsterixMetadataProperties metadataProperties) throws HyracksDataException {
         mountPoints = new String[devices.size()];
         this.nodeId = nodeId;
+        this.clusterPartitions = metadataProperties.getClusterPartitions();
         for (int i = 0; i < mountPoints.length; i++) {
             String mountPoint = devices.get(i).getPath().getPath();
             File mountPointDir = new File(mountPoint);
@@ -156,37 +162,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             resourceCache.put(resource.getResourcePath(), resource);
         }
 
-        FileOutputStream fos = null;
-        ObjectOutputStream oosToFos = null;
-
-        try {
-            fos = new FileOutputStream(resourceFile);
-            oosToFos = new ObjectOutputStream(fos);
+        try (FileOutputStream fos = new FileOutputStream(resourceFile);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(resource);
             oosToFos.flush();
         } catch (IOException e) {
             throw new HyracksDataException(e);
-        } finally {
-            if (oosToFos != null) {
-                try {
-                    oosToFos.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-            if (oosToFos == null && fos != null) {
-                try {
-                    fos.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+        }
 
-            //if replication enabled, send resource metadata info to remote nodes
-            if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-                String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
-                createReplicationJob(ReplicationOperation.REPLICATE, filePath);
-            }
+        //if replication enabled, send resource metadata info to remote nodes
+        if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
+            String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
+            createReplicationJob(ReplicationOperation.REPLICATE, filePath);
         }
     }
 
@@ -304,31 +291,12 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     }
 
     public static LocalResource readLocalResource(File file) throws HyracksDataException {
-        FileInputStream fis = null;
-        ObjectInputStream oisFromFis = null;
-
-        try {
-            fis = new FileInputStream(file);
-            oisFromFis = new ObjectInputStream(fis);
+        try (FileInputStream fis = new FileInputStream(file);
+                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
             LocalResource resource = (LocalResource) oisFromFis.readObject();
             return resource;
         } catch (Exception e) {
             throw new HyracksDataException(e);
-        } finally {
-            if (oisFromFis != null) {
-                try {
-                    oisFromFis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-            if (oisFromFis == null && fis != null) {
-                try {
-                    fis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
         }
     }
 
@@ -427,4 +395,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         }
         return storageRootDir;
     }
+
+    /**
+     * @param partition
+     * @return The partition local path on this NC.
+     */
+    public String getPartitionPath(int partition) {
+        //currently each partition is replicated on the same IO device number on all NCs.
+        return mountPoints[clusterPartitions.get(partition).getIODeviceNum()];
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index b6bb7dc..e79a0d3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
@@ -26,14 +27,17 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory {
     private final IIOManager ioManager;
     private final String nodeId;
+    private final AsterixMetadataProperties metadataProperties;
 
-    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId) {
+    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId,
+            AsterixMetadataProperties metadataProperties) {
         this.ioManager = ioManager;
         this.nodeId = nodeId;
+        this.metadataProperties = metadataProperties;
     }
 
     @Override
     public ILocalResourceRepository createRepository() throws HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId);
+        return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId, metadataProperties);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 701e529..8fe75f2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -44,13 +44,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -72,14 +70,11 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -300,7 +295,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             //get datasetLifeCycleManager
             IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                     .getDatasetLifecycleManager();
-            IIOManager ioManager = appRuntimeContext.getIOManager();
             ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
             Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
                     .loadAndGetAllResources();
@@ -507,13 +501,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
-        IIOManager ioManager = appRuntimeContext.getIOManager();
-        SortedMap<Integer, ClusterPartition> clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext
-                .getAppContext()).getMetadataProperties().getClusterPartitions();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
-        ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
-        Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
-                .loadAndGetAllResources();
+        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+                .getLocalResourceRepository();
+        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
         //#. set log reader to the lowWaterMarkLsn again.
         for (int i = 0; i < remoteLogs.size(); i++) {
             logRecord = remoteLogs.get(i);
@@ -561,16 +552,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            //get the resource path relative to this node
-                            int resourcePartition = localResource.getPartition();
-                            //get partition io device id
-                            //NOTE:
-                            //currently we store all partition in the same IO device in all nodes. If this changes,
-                            //this needs to be updated to find the IO device in which the partition is stored in this local node.
-                            int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum();
-                            String resourceAbsolutePath = ioManager
-                                    .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile()
-                                    .getAbsolutePath();
+                            //get partition path in this node
+                            String partitionIODevicePath = localResourceRepository
+                                    .getPartitionPath(localResource.getPartition());
+                            String resourceAbsolutePath = partitionIODevicePath + File.separator
+                                    + localResource.getResourceName();
                             localResource.setResourcePath(resourceAbsolutePath);
                             index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                             if (index == null) {
@@ -578,8 +564,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
                                         resourceAbsolutePath, localResource.getPartition());
-                                datasetLifecycleManager.register(localResource.getResourceName(), index);
-                                datasetLifecycleManager.open(localResource.getResourceName());
+                                datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                datasetLifecycleManager.open(resourceAbsolutePath);
 
                                 //#. get maxDiskLastLSN
                                 ILSMIndex lsmIndex = index;
@@ -1099,6 +1085,242 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         }
     }
 
+    //TODO (mhubail) RecoveryManager has three methods that perform logs REDO based on different parameters.
+    //They need to be refactored to use partitions only once the log format includes partition id.
+    @Override
+    public synchronized void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode)
+            throws IOException, ACIDException {
+        //delete any recovery files from previous failed recovery attempts
+        deleteRecoveryTemporaryFiles();
+
+        int updateLogCount = 0;
+        int entityCommitLogCount = 0;
+        int jobCommitLogCount = 0;
+        int redoCount = 0;
+        int abortLogCount = 0;
+        int jobId = -1;
+
+        state = SystemState.RECOVERING;
+        LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
+
+        Set<Integer> winnerJobSet = new HashSet<Integer>();
+        jobId2WinnerEntitiesMap = new HashMap<>();
+
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        JobEntityCommits jobEntityWinners = null;
+        //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        if (lowWaterMarkLSN < readableSmallestLSN) {
+            lowWaterMarkLSN = readableSmallestLSN;
+        }
+        //-------------------------------------------------------------------------
+        //  [ analysis phase ]
+        //  - collect all committed Lsn
+        //-------------------------------------------------------------------------
+        LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
+        IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+        //get datasetLifeCycleManager
+        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
+        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+                .getLocalResourceRepository();
+        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
+        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+
+        //#. set log reader to the lowWaterMarkLsn
+        ILogReader logReader = logMgr.getLogReader(true);
+        ILogRecord logRecord = null;
+        try {
+            logReader.initializeScan(lowWaterMarkLSN);
+            logRecord = logReader.next();
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                //TODO update this partitions once the log format is updated to include partitons
+                if (logRecord.getNodeId().equals(failedNode)) {
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                            updateLogCount++;
+                            break;
+                        case LogType.JOB_COMMIT:
+                            jobId = logRecord.getJobId();
+                            winnerJobSet.add(jobId);
+                            if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                                //to delete any spilled files as well
+                                jobEntityWinners.clear();
+                                jobId2WinnerEntitiesMap.remove(jobId);
+                            }
+                            jobCommitLogCount++;
+                            break;
+                        case LogType.ENTITY_COMMIT:
+                            jobId = logRecord.getJobId();
+                            if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = new JobEntityCommits(jobId);
+                                if (needToFreeMemory()) {
+                                    //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
+                                    //This could happen only when we have many jobs with small number of records and none of them have job commit.
+                                    freeJobsCachedEntities(jobId);
+                                }
+                                jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
+                            } else {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                            }
+                            jobEntityWinners.add(logRecord);
+                            entityCommitLogCount++;
+                            break;
+                        case LogType.ABORT:
+                            abortLogCount++;
+                            break;
+                        case LogType.FLUSH:
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
+                }
+                logRecord = logReader.next();
+            }
+
+            //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
+            for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
+                winners.prepareForSearch();
+            }
+            //-------------------------------------------------------------------------
+            //  [ redo phase ]
+            //  - redo if
+            //    1) The TxnId is committed && --> guarantee durability
+            //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
+            //-------------------------------------------------------------------------
+            LOGGER.info("[RecoveryMgr] in redo phase");
+
+            long resourceId;
+            long maxDiskLastLsn;
+            long LSN = -1;
+            ILSMIndex index = null;
+            LocalResource localResource = null;
+            ILocalResourceMetadata localResourceMetadata = null;
+            boolean foundWinner = false;
+            //set log reader to the lowWaterMarkLsn again.
+            logReader.initializeScan(lowWaterMarkLSN);
+            logRecord = logReader.next();
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                //TODO update this to check for partitions instead of node id once the log format is updated to include partitions
+                if (logRecord.getNodeId().equals(failedNode)) {
+                    LSN = logRecord.getLSN();
+                    jobId = logRecord.getJobId();
+                    foundWinner = false;
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                            if (winnerJobSet.contains(jobId)) {
+                                foundWinner = true;
+                            } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                                tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                        logRecord.getPKValue(), logRecord.getPKValueSize());
+                                if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
+                                    foundWinner = true;
+                                }
+                            }
+                            if (foundWinner) {
+                                resourceId = logRecord.getResourceId();
+                                localResource = resourcesMap.get(resourceId);
+                                /*******************************************************************
+                                 * [Notice]
+                                 * -> Issue
+                                 * Delete index may cause a problem during redo.
+                                 * The index operation to be redone couldn't be redone because the corresponding index
+                                 * may not exist in NC due to the possible index drop DDL operation.
+                                 * -> Approach
+                                 * Avoid the problem during redo.
+                                 * More specifically, the problem will be detected when the localResource of
+                                 * the corresponding index is retrieved, which will end up with 'null'.
+                                 * If null is returned, then just go and process the next
+                                 * log record.
+                                 *******************************************************************/
+                                if (localResource == null) {
+                                    logRecord = logReader.next();
+                                    continue;
+                                }
+                                /*******************************************************************/
+
+                                //get index instance from IndexLifeCycleManager
+                                //if index is not registered into IndexLifeCycleManager,
+                                //create the index using LocalMetadata stored in LocalResourceRepository
+                                //get partition path in this node
+                                String partitionIODevicePath = localResourceRepository
+                                        .getPartitionPath(localResource.getPartition());
+                                String resourceAbsolutePath = partitionIODevicePath + File.separator
+                                        + localResource.getResourceName();
+                                localResource.setResourcePath(resourceAbsolutePath);
+                                index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+                                if (index == null) {
+                                    //#. create index instance and register to indexLifeCycleManager
+                                    localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+                                    index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+                                            resourceAbsolutePath, localResource.getPartition());
+                                    datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                    datasetLifecycleManager.open(resourceAbsolutePath);
+
+                                    //#. get maxDiskLastLSN
+                                    ILSMIndex lsmIndex = index;
+                                    try {
+                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
+                                                .getIOOperationCallback())
+                                                        .getComponentLSN(lsmIndex.getImmutableComponents());
+                                    } catch (HyracksDataException e) {
+                                        datasetLifecycleManager.close(resourceAbsolutePath);
+                                        throw e;
+                                    }
+
+                                    //#. set resourceId and maxDiskLastLSN to the map
+                                    resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+                                } else {
+                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                }
+
+                                if (LSN > maxDiskLastLsn) {
+                                    redo(logRecord, datasetLifecycleManager);
+                                    redoCount++;
+                                }
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                        case LogType.ENTITY_COMMIT:
+                        case LogType.ABORT:
+                        case LogType.FLUSH:
+                            //do nothing
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
+                }
+                logRecord = logReader.next();
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("[RecoveryMgr] recovery is completed.");
+                LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
+                        + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
+                        + "/" + redoCount);
+            }
+        } finally {
+            logReader.close();
+
+            //close all indexes
+            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+            for (long r : resourceIdList) {
+                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
+            }
+
+            //delete any recovery files after completing recovery
+            deleteRecoveryTemporaryFiles();
+        }
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final int jobId;
@@ -1145,7 +1367,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         /**
          * Call this method when no more entity commits will be added to this job.
-         * 
+         *
          * @throws IOException
          */
         public void prepareForSearch() throws IOException {
@@ -1182,7 +1404,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
          */
         public ArrayList<File> getCandidiatePartitions(long logLSN) {
             ArrayList<File> candidiatePartitions = new ArrayList<File>();
-
             for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
                 String partitionName = partition.getName();
                 //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN 


Mime
View raw message