Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD60A18741 for ; Tue, 26 Jan 2016 23:31:39 +0000 (UTC) Received: (qmail 95383 invoked by uid 500); 26 Jan 2016 23:31:39 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 95347 invoked by uid 500); 26 Jan 2016 23:31:39 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 95290 invoked by uid 99); 26 Jan 2016 23:31:39 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2016 23:31:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D20ABC254B for ; Tue, 26 Jan 2016 23:31:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.8 X-Spam-Level: * X-Spam-Status: No, score=1.8 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id gC7Z4vgsvT0O for ; Tue, 26 Jan 2016 23:31:26 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id ACC2342BC5 for ; Tue, 26 Jan 2016 23:31:25 +0000 (UTC) Received: (qmail 95217 invoked by uid 99); 26 Jan 2016 23:31:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2016 23:31:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B8167DFF96; Tue, 26 Jan 2016 23:31:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mhubail@apache.org To: commits@asterixdb.incubator.apache.org Date: Tue, 26 Jan 2016 23:31:24 -0000 Message-Id: <61ceea5a25c84196858c9ee5275228e3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance Repository: incubator-asterixdb Updated Branches: refs/heads/master 5b068d2d0 -> 8fc8bf8b5 http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java deleted file mode 100644 index 5e0c9b0..0000000 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.storage; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager; -import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; - -public class AsterixLSMIndexFileProperties { - - private String fileName; - private long fileSize; - private String nodeId; - private String dataverse; - private int ioDeviceNum; - private String idxName; - private boolean lsmComponentFile; - private String filePath; - private boolean requiresAck = false; - private long LSNByteOffset; - - public AsterixLSMIndexFileProperties() { - } - - public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, - long LSNByteOffset, boolean requiresAck) { - initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck); - } - - public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) { - initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, - IMetaDataPageManager.INVALID_LSN_OFFSET, false); - } - - public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset, - boolean requiresAck) { - this.filePath = filePath; - this.fileSize = fileSize; - this.nodeId = nodeId; - String[] tokens = filePath.split(File.separator); - int arraySize = tokens.length; - this.fileName = tokens[arraySize - 1]; - this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]); - this.idxName = tokens[arraySize - 3]; - this.dataverse = tokens[arraySize - 4]; - this.lsmComponentFile = lsmComponentFile; - this.LSNByteOffset = LSNByteOffset; - this.requiresAck = requiresAck; - } - - public static int getDeviceIONumFromName(String name) { - return Integer.parseInt(name.substring(IndexFileNameUtil.IO_DEVICE_NAME_PREFIX.length())); - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - dos.writeUTF(nodeId); - dos.writeUTF(filePath); - dos.writeLong(fileSize); - dos.writeBoolean(lsmComponentFile); - dos.writeLong(LSNByteOffset); - dos.writeBoolean(requiresAck); - } - - public static AsterixLSMIndexFileProperties create(DataInput input) throws IOException { - String nodeId = input.readUTF(); - String filePath = input.readUTF(); - long fileSize = input.readLong(); - boolean lsmComponentFile = input.readBoolean(); - long LSNByteOffset = input.readLong(); - boolean requiresAck = input.readBoolean(); - AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties(); - fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck); - return fileProp; - } - - public String getFilePath() { - return filePath; - } - - public void setFilePath(String filePath) { - this.filePath = filePath; - } - - public long getFileSize() { - return fileSize; - } - - public String getFileName() { - return fileName; - } - - public void setFileName(String fileName) { - this.fileName = fileName; - } - - public String getNodeId() { - return nodeId; - } - - public void setNodeId(String nodeId) { - this.nodeId = nodeId; - } - - public String getDataverse() { - return dataverse; - } - - public void setDataverse(String dataverse) { - this.dataverse = dataverse; - } - - public void setFileSize(long fileSize) { - this.fileSize = fileSize; - } - - public int getIoDeviceNum() { - return ioDeviceNum; - } - - public void setIoDeviceNum(int ioDevoceNum) { - this.ioDeviceNum = ioDevoceNum; - } - - public String getIdxName() { - return idxName; - } - - public void setIdxName(String idxName) { - this.idxName = idxName; - } - - public boolean isLSMComponentFile() { - return lsmComponentFile; - } - - public void setLsmComponentFile(boolean lsmComponentFile) { - this.lsmComponentFile = lsmComponentFile; - } - - public boolean requiresAck() { - return requiresAck; - } - - public void setRequiresAck(boolean requiresAck) { - this.requiresAck = requiresAck; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("File Name: " + fileName + " "); - sb.append("File Size: " + fileSize + " "); - sb.append("Node ID: " + nodeId + " "); - sb.append("I/O Device: " + ioDeviceNum + " "); - sb.append("IDX Name: " + idxName + " "); - sb.append("isLSMComponentFile : " + lsmComponentFile + " "); - sb.append("Dataverse: " + dataverse); - sb.append("LSN Byte Offset: " + LSNByteOffset); - return sb.toString(); - } - - public long getLSNByteOffset() { - return LSNByteOffset; - } - - public void setLSNByteOffset(long lSNByteOffset) { - LSNByteOffset = lSNByteOffset; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java index 794a6e1..841a99f 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java @@ -47,9 +47,10 @@ public class LSMComponentProperties { public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) { this.nodeId = nodeId; - componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId); + componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]); numberOfFiles = new AtomicInteger(job.getJobFiles().size()); - originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext()); + originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), + job.getLSMIndexOperationContext()); opType = job.getLSMOpType(); } @@ -94,7 +95,7 @@ public class LSMComponentProperties { public String getMaskPath(ReplicaResourcesManager resourceManager) { if (maskPath == null) { - AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this); + LSMIndexFileProperties afp = new LSMIndexFileProperties(this); maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName() + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX; } @@ -103,9 +104,8 @@ public class LSMComponentProperties { public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) { if (replicaPath == null) { - AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this); - replicaPath = resourceManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(), - afp.getIdxName()); + LSMIndexFileProperties afp = new LSMIndexFileProperties(this); + replicaPath = resourceManager.getIndexPath(afp); } return replicaPath; } @@ -113,27 +113,24 @@ public class LSMComponentProperties { /*** * @param filePath * any file of the LSM component - * @param nodeId * @return a unique id based on the timestamp of the component */ - public static String getLSMComponentID(String filePath, String nodeId) { + public static String getLSMComponentID(String filePath) { String[] tokens = filePath.split(File.separator); int arraySize = tokens.length; String fileName = tokens[arraySize - 1]; - String ioDevoceName = tokens[arraySize - 2]; - String idxName = tokens[arraySize - 3]; - String dataverse = tokens[arraySize - 4]; + String idxName = tokens[arraySize - 2]; + String dataverse = tokens[arraySize - 3]; + String partitionName = tokens[arraySize - 4]; StringBuilder componentId = new StringBuilder(); - componentId.append(nodeId); + componentId.append(partitionName); componentId.append(File.separator); componentId.append(dataverse); componentId.append(File.separator); componentId.append(idxName); componentId.append(File.separator); - componentId.append(ioDevoceName); - componentId.append(File.separator); componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.SPLIT_STRING))); return componentId.toString(); } @@ -142,18 +139,10 @@ public class LSMComponentProperties { return componentId; } - public void setComponentId(String componentId) { - this.componentId = componentId; - } - public long getOriginalLSN() { return originalLSN; } - public void setOriginalLSN(long lSN) { - originalLSN = lSN; - } - public String getNodeId() { return nodeId; } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java new file mode 100644 index 0000000..890d3a2 --- /dev/null +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.storage; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager; + +public class LSMIndexFileProperties { + + private String fileName; + private long fileSize; + private String nodeId; + private String dataverse; + private String idxName; + private boolean lsmComponentFile; + private String filePath; + private boolean requiresAck = false; + private long LSNByteOffset; + private int partition; + + public LSMIndexFileProperties() { + } + + public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, + long LSNByteOffset, boolean requiresAck) { + initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck); + } + + public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) { + initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, + IMetaDataPageManager.INVALID_LSN_OFFSET, false); + } + + public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset, + boolean requiresAck) { + this.filePath = filePath; + this.fileSize = fileSize; + this.nodeId = nodeId; + this.lsmComponentFile = lsmComponentFile; + this.LSNByteOffset = LSNByteOffset; + this.requiresAck = requiresAck; + } + + public void splitFileName() { + String[] tokens = filePath.split(File.separator); + int arraySize = tokens.length; + this.fileName = tokens[arraySize - 1]; + this.idxName = tokens[arraySize - 2]; + this.dataverse = tokens[arraySize - 3]; + this.partition = getPartitonNumFromName(tokens[arraySize - 4]); + } + + private static int getPartitonNumFromName(String name) { + return Integer.parseInt(name.substring(StoragePathUtil.PARTITION_DIR_PREFIX.length())); + } + + public void serialize(OutputStream out) throws IOException { + DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(nodeId); + dos.writeUTF(filePath); + dos.writeLong(fileSize); + dos.writeBoolean(lsmComponentFile); + dos.writeLong(LSNByteOffset); + dos.writeBoolean(requiresAck); + } + + public static LSMIndexFileProperties create(DataInput input) throws IOException { + String nodeId = input.readUTF(); + String filePath = input.readUTF(); + long fileSize = input.readLong(); + boolean lsmComponentFile = input.readBoolean(); + long LSNByteOffset = input.readLong(); + boolean requiresAck = input.readBoolean(); + LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile, + LSNByteOffset, requiresAck); + return fileProp; + } + + public String getFilePath() { + return filePath; + } + + public long getFileSize() { + return fileSize; + } + + public String getFileName() { + return fileName; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public String getDataverse() { + return dataverse; + } + + public void setDataverse(String dataverse) { + this.dataverse = dataverse; + } + + public String getIdxName() { + return idxName; + } + + public boolean isLSMComponentFile() { + return lsmComponentFile; + } + + public boolean requiresAck() { + return requiresAck; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("File Name: " + fileName + " "); + sb.append("File Size: " + fileSize + " "); + sb.append("Node ID: " + nodeId + " "); + sb.append("Partition: " + partition + " "); + sb.append("IDX Name: " + idxName + " "); + sb.append("isLSMComponentFile : " + lsmComponentFile + " "); + sb.append("Dataverse: " + dataverse); + sb.append("LSN Byte Offset: " + LSNByteOffset); + return sb.toString(); + } + + public long getLSNByteOffset() { + return LSNByteOffset; + } + + public int getPartition() { + return partition; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java ---------------------------------------------------------------------- diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index 3e3043c..b9f7506 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -38,91 +38,65 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.config.AsterixMetadataProperties; import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; +import org.apache.asterix.om.util.AsterixClusterProperties; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.IODeviceHandle; -import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; +import org.apache.hyracks.storage.common.file.ILocalResourceRepository; import org.apache.hyracks.storage.common.file.LocalResource; public class ReplicaResourcesManager implements IReplicaResourcesManager { private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName()); - private final String[] mountPoints; - private final int numIODevices; - private static final String REPLICA_FOLDER_SUFFIX = "_replica"; - private final String replicationStorageFolder; - public final String localStorageFolder; - private final String localNodeID; public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask"; private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP"; public static final long REPLICA_INDEX_CREATION_LSN = -1; private final AtomicLong lastMinRemoteLSN; + private final PersistentLocalResourceRepository localRepository; + private final Map nodePartitions; - public ReplicaResourcesManager(List devices, String localStorageFolder, String localNodeID, - String replicationStorageFolder) throws HyracksDataException { - numIODevices = devices.size(); - this.mountPoints = new String[numIODevices]; - for (int i = 0; i < numIODevices; i++) { - String mountPoint = devices.get(i).getPath().getPath(); - File mountPointDir = new File(mountPoint); - if (!mountPointDir.exists()) { - throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist."); - } - if (!mountPoint.endsWith(System.getProperty("file.separator"))) { - mountPoints[i] = new String(mountPoint + System.getProperty("file.separator")); - } else { - mountPoints[i] = new String(mountPoint); - } - } - this.localStorageFolder = localStorageFolder; - this.localNodeID = localNodeID; - this.replicationStorageFolder = replicationStorageFolder; + public ReplicaResourcesManager(ILocalResourceRepository localRepository, + AsterixMetadataProperties metadataProperties) { + this.localRepository = (PersistentLocalResourceRepository) localRepository; + nodePartitions = metadataProperties.getNodePartitions(); lastMinRemoteLSN = new AtomicLong(-1); } - @Override - public String getLocalStorageFolder() { - return localStorageFolder; - } - - private String getReplicaStorageFolder(String replicaId, int IODeviceNum) { - if (replicaId.equals(localNodeID)) { - return mountPoints[IODeviceNum] + localStorageFolder; - } else { - return mountPoints[IODeviceNum] + replicationStorageFolder + File.separator + replicaId - + REPLICA_FOLDER_SUFFIX; - } - } - - public void deleteRemoteFile(AsterixLSMIndexFileProperties afp) throws IOException { - String indexPath = getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(), afp.getIdxName()); + public void deleteIndexFile(LSMIndexFileProperties afp) { + String indexPath = getIndexPath(afp); if (indexPath != null) { if (afp.isLSMComponentFile()) { String backupFilePath = indexPath + File.separator + afp.getFileName(); //delete file File destFile = new File(backupFilePath); - if (destFile.exists()) { - destFile.delete(); - } + FileUtils.deleteQuietly(destFile); } else { //delete index files indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator)); - AsterixFilesUtil.deleteFolder(indexPath); + FileUtils.deleteQuietly(new File(indexPath)); } } } - public String getIndexPath(String replicaId, int IODeviceNum, String dataverse, String dataset) { - //mounting point/backupNodeId_replica/Dataverse/Dataset/device_id_#/ - String remoteIndexFolderPath = getReplicaStorageFolder(replicaId, IODeviceNum) + File.separator + dataverse - + File.separator + dataset + File.separator + IndexFileNameUtil.IO_DEVICE_NAME_PREFIX + IODeviceNum; - Path path = Paths.get(remoteIndexFolderPath); + public String getIndexPath(LSMIndexFileProperties fileProperties) { + fileProperties.splitFileName(); + //get partition path in this node + String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition()); + //get index path + String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(), + fileProperties.getDataverse(), fileProperties.getIdxName()); + + Path path = Paths.get(indexPath); if (!Files.exists(path)) { - File indexFolder = new File(remoteIndexFolderPath); + File indexFolder = new File(indexPath); indexFolder.mkdirs(); } - return remoteIndexFolderPath; + return indexPath; } public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException { @@ -144,14 +118,10 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { //remove mask to mark component as valid String maskPath = lsmComponentProperties.getMaskPath(this); Path path = Paths.get(maskPath); - - if (Files.exists(path)) { - File maskFile = new File(maskPath); - maskFile.delete(); - } + Files.deleteIfExists(path); //add component LSN to the index LSNs map - HashMap lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this)); + Map lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this)); lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN()); //update map on disk @@ -159,73 +129,11 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { } - public List getResourcesForReplica(String nodeId) throws HyracksDataException { - List resourcesList = new ArrayList(); - String rootFolder; - for (int i = 0; i < numIODevices; i++) { - rootFolder = getReplicaStorageFolder(nodeId, i); - File rootDirFile = new File(rootFolder); - if (!rootDirFile.exists()) { - continue; - } - - File[] dataverseFileList = rootDirFile.listFiles(); - for (File dataverseFile : dataverseFileList) { - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - if (indexFile.isDirectory()) { - File[] ioDevicesList = indexFile.listFiles(); - if (ioDevicesList != null) { - for (File ioDeviceFile : ioDevicesList) { - if (ioDeviceFile.isDirectory()) { - File[] metadataFiles = ioDeviceFile.listFiles(LSM_INDEX_FILES_FILTER); - if (metadataFiles != null) { - for (File metadataFile : metadataFiles) { - resourcesList.add(metadataFile.getAbsolutePath()); - } - } - } - } - } - } - } - } - } - } - } - return resourcesList; - } - - public Set getReplicaIndexes(String replicaId) throws HyracksDataException { + public Set getReplicaIndexes(String replicaId) { Set remoteIndexesPaths = new HashSet(); - for (int i = 0; i < numIODevices; i++) { - String rootReplicaFolder = getReplicaStorageFolder(replicaId, i); - File rootDirFile = new File(rootReplicaFolder); - if (!rootDirFile.exists()) { - continue; - } - File[] dataverseFileList = rootDirFile.listFiles(); - for (File dataverseFile : dataverseFileList) { - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - if (indexFile.isDirectory()) { - File[] ioDevicesList = indexFile.listFiles(); - if (ioDevicesList != null) { - for (File ioDeviceFile : ioDevicesList) { - if (ioDeviceFile.isDirectory()) { - remoteIndexesPaths.add(ioDeviceFile); - } - } - } - } - } - } - } - } + ClusterPartition[] partitions = nodePartitions.get(replicaId); + for (ClusterPartition partition : partitions) { + remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId())); } return remoteIndexesPaths; } @@ -237,41 +145,60 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { } long minRemoteLSN = Long.MAX_VALUE; for (String replica : replicaIds) { - try { - //for every index in replica - Set remoteIndexes = getReplicaIndexes(replica); - for (File indexFolder : remoteIndexes) { - //read LSN map - try { - //get max LSN per index - long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder); - - //get min of all maximums - minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); - } catch (IOException e) { - LOGGER.log(Level.INFO, indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " - + indexFolder); - continue; - } + //for every index in replica + Set remoteIndexes = getReplicaIndexes(replica); + for (File indexFolder : remoteIndexes) { + //read LSN map + try { + //get max LSN per index + long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder); + + //get min of all maximums + minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); + } catch (IOException e) { + LOGGER.log(Level.INFO, + indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder); + continue; } - } catch (HyracksDataException e) { - e.printStackTrace(); } } lastMinRemoteLSN.set(minRemoteLSN); return minRemoteLSN; } - public HashMap getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) - throws IOException { - HashMap laggingReplicaIndexes = new HashMap(); + @Override + public long getPartitionsMinLSN(Integer[] partitions) { + long minRemoteLSN = Long.MAX_VALUE; + for (Integer partition : partitions) { + //for every index in replica + Set remoteIndexes = getPartitionIndexes(partition); + for (File indexFolder : remoteIndexes) { + //read LSN map + try { + //get max LSN per index + long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder); + + //get min of all maximums + minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN); + } catch (IOException e) { + LOGGER.log(Level.INFO, + indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder); + continue; + } + } + } + return minRemoteLSN; + } + + public Map getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException { + Map laggingReplicaIndexes = new HashMap(); try { //for every index in replica Set remoteIndexes = getReplicaIndexes(replicaId); for (File indexFolder : remoteIndexes) { if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) { - File localResource = new File(indexFolder + File.separator - + PersistentLocalResourceRepository.METADATA_FILE_NAME); + File localResource = new File( + indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME); LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource); laggingReplicaIndexes.put(resource.getResourceId(), indexFolder.getAbsolutePath()); } @@ -286,7 +213,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { private long getReplicaIndexMaxLSN(File indexFolder) throws IOException { long remoteIndexMaxLSN = 0; //get max LSN per index - HashMap lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath()); + Map lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath()); if (lsnMap != null) { for (Long lsn : lsnMap.values()) { remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn); @@ -296,7 +223,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { return remoteIndexMaxLSN; } - public void cleanInvalidLSMComponents(String replicaId) throws HyracksDataException { + public void cleanInvalidLSMComponents(String replicaId) { //for every index in replica Set remoteIndexes = getReplicaIndexes(replicaId); for (File remoteIndexFile : remoteIndexes) { @@ -312,7 +239,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { } } - private void deleteLSMComponentFilesForMask(File maskFile) { + private static void deleteLSMComponentFilesForMask(File maskFile) { String lsmComponentTimeStamp = maskFile.getName().substring(0, maskFile.getName().length() - LSM_COMPONENT_MASK_SUFFIX.length()); File indexFolder = maskFile.getParentFile(); @@ -325,78 +252,92 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager { } } - @SuppressWarnings("unchecked") - public synchronized HashMap getReplicaIndexLSNMap(String indexPath) throws IOException { - FileInputStream fis = null; - ObjectInputStream oisFromFis = null; - try { - fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME); - oisFromFis = new ObjectInputStream(fis); + @SuppressWarnings({ "unchecked" }) + public synchronized Map getReplicaIndexLSNMap(String indexPath) throws IOException { + try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME); + ObjectInputStream oisFromFis = new ObjectInputStream(fis)) { Map lsnMap = null; try { lsnMap = (Map) oisFromFis.readObject(); } catch (ClassNotFoundException e) { e.printStackTrace(); } - return (HashMap) lsnMap; - } finally { - if (oisFromFis != null) { - oisFromFis.close(); - } - if (oisFromFis == null && fis != null) { - fis.close(); - } + return lsnMap; } } - public synchronized void updateReplicaIndexLSNMap(String indexPath, HashMap lsnMap) throws IOException { - FileOutputStream fos = null; - ObjectOutputStream oosToFos = null; - try { - fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME); - oosToFos = new ObjectOutputStream(fos); + public synchronized void updateReplicaIndexLSNMap(String indexPath, Map lsnMap) throws IOException { + try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME); + ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { oosToFos.writeObject(lsnMap); oosToFos.flush(); lastMinRemoteLSN.set(-1); - } finally { - if (oosToFos != null) { - oosToFos.close(); + } + } + + /** + * @param partition + * @return Set of file references to each index in the partition + */ + public Set getPartitionIndexes(int partition) { + Set partitionIndexes = new HashSet(); + String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); + String partitionStoragePath = localRepository.getPartitionPath(partition) + + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition); + File partitionRoot = new File(partitionStoragePath); + if (partitionRoot.exists() && partitionRoot.isDirectory()) { + File[] dataverseFileList = partitionRoot.listFiles(); + if (dataverseFileList != null) { + for (File dataverseFile : dataverseFileList) { + if (dataverseFile.isDirectory()) { + File[] indexFileList = dataverseFile.listFiles(); + if (indexFileList != null) { + for (File indexFile : indexFileList) { + partitionIndexes.add(indexFile); + } + } + } + } } - if (oosToFos == null && fos != null) { - fos.close(); + } + return partitionIndexes; + } + + /** + * @param partition + * @return Absolute paths to all partition files + */ + public List getPartitionIndexesFiles(int partition) { + List partitionFiles = new ArrayList(); + Set partitionIndexes = getPartitionIndexes(partition); + for (File indexDir : partitionIndexes) { + if (indexDir.isDirectory()) { + File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); + if (indexFiles != null) { + for (File file : indexFiles) { + partitionFiles.add(file.getAbsolutePath()); + } + } } } + return partitionFiles; } private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new FilenameFilter() { public boolean accept(File dir, String name) { - if (name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) { - return true; - } else { - return false; - } + return name.endsWith(LSM_COMPONENT_MASK_SUFFIX); } }; private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new FilenameFilter() { public boolean accept(File dir, String name) { - if (!name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) { - return true; - } else { - return false; - } + return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX); } }; private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() { public boolean accept(File dir, String name) { - if (name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME)) { - return true; - } else if (!name.startsWith(".")) { - return true; - } else { - return false; - } + return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith("."); } }; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/data/fbu.adm ---------------------------------------------------------------------- diff --git a/asterix-replication/src/test/resources/data/fbu.adm b/asterix-replication/src/test/resources/data/fbu.adm new file mode 100644 index 0000000..7e99ea4 --- /dev/null +++ b/asterix-replication/src/test/resources/data/fbu.adm @@ -0,0 +1,10 @@ +{"id":1,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]} +{"id":2,"alias":"Isbel","name":"IsbelDull","user-since":datetime("2011-01-22T10:10:00"),"friend-ids":{{1,4}},"employment":[{"organization-name":"Hexviafind","start-date":date("2010-04-27")}]} +{"id":3,"alias":"Emory","name":"EmoryUnk","user-since":datetime("2012-07-10T10:10:00"),"friend-ids":{{1,5,8,9}},"employment":[{"organization-name":"geomedia","start-date":date("2010-06-17"),"end-date":date("2010-01-26")}]} +{"id":4,"alias":"Nicholas","name":"NicholasStroh","user-since":datetime("2010-12-27T10:10:00"),"friend-ids":{{2}},"employment":[{"organization-name":"Zamcorporation","start-date":date("2010-06-08")}]} +{"id":5,"alias":"Von","name":"VonKemble","user-since":datetime("2010-01-05T10:10:00"),"friend-ids":{{3,6,10}},"employment":[{"organization-name":"Kongreen","start-date":date("2010-11-27")}]} +{"id":6,"alias":"Willis","name":"WillisWynne","user-since":datetime("2005-01-17T10:10:00"),"friend-ids":{{1,3,7}},"employment":[{"organization-name":"jaydax","start-date":date("2009-05-15")}]} +{"id":7,"alias":"Suzanna","name":"SuzannaTillson","user-since":datetime("2012-08-07T10:10:00"),"friend-ids":{{6}},"employment":[{"organization-name":"Labzatron","start-date":date("2011-04-19")}]} +{"id":8,"alias":"Nila","name":"NilaMilliron","user-since":datetime("2008-01-01T10:10:00"),"friend-ids":{{3}},"employment":[{"organization-name":"Plexlane","start-date":date("2010-02-28")}]} +{"id":9,"alias":"Woodrow","name":"WoodrowNehling","user-since":datetime("2005-09-20T10:10:00"),"friend-ids":{{3,10}},"employment":[{"organization-name":"Zuncan","start-date":date("2003-04-22"),"end-date":date("2009-12-13")}]} +{"id":10,"alias":"Bram","name":"BramHatch","user-since":datetime("2010-10-16T10:10:00"),"friend-ids":{{1,5,9}},"employment":[{"organization-name":"physcane","start-date":date("2007-06-05"),"end-date":date("2011-11-05")}]} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/scripts/delete_storage.sh ---------------------------------------------------------------------- diff --git a/asterix-replication/src/test/resources/scripts/delete_storage.sh b/asterix-replication/src/test/resources/scripts/delete_storage.sh new file mode 100755 index 0000000..129030b --- /dev/null +++ b/asterix-replication/src/test/resources/scripts/delete_storage.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +rm -rf ~/asterix/* http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh ---------------------------------------------------------------------- diff --git a/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh new file mode 100755 index 0000000..2582713 --- /dev/null +++ b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +jps | awk '{if ($2 == "NCDriver" || $2 == "CCDriver") print $1;}' | xargs -n 1 kill -9 http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 52fd806..9f59148 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -32,9 +32,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.SortedMap; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.config.AsterixMetadataProperties; import org.apache.asterix.common.replication.AsterixReplicationJob; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.commons.io.FileUtils; @@ -63,10 +66,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito private IReplicationManager replicationManager; private boolean isReplicationEnabled = false; private Set filesToBeReplicated; + private final SortedMap clusterPartitions; - public PersistentLocalResourceRepository(List devices, String nodeId) throws HyracksDataException { + public PersistentLocalResourceRepository(List devices, String nodeId, + AsterixMetadataProperties metadataProperties) throws HyracksDataException { mountPoints = new String[devices.size()]; this.nodeId = nodeId; + this.clusterPartitions = metadataProperties.getClusterPartitions(); for (int i = 0; i < mountPoints.length; i++) { String mountPoint = devices.get(i).getPath().getPath(); File mountPointDir = new File(mountPoint); @@ -156,37 +162,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito resourceCache.put(resource.getResourcePath(), resource); } - FileOutputStream fos = null; - ObjectOutputStream oosToFos = null; - - try { - fos = new FileOutputStream(resourceFile); - oosToFos = new ObjectOutputStream(fos); + try (FileOutputStream fos = new FileOutputStream(resourceFile); + ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { oosToFos.writeObject(resource); oosToFos.flush(); } catch (IOException e) { throw new HyracksDataException(e); - } finally { - if (oosToFos != null) { - try { - oosToFos.close(); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - if (oosToFos == null && fos != null) { - try { - fos.close(); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } + } - //if replication enabled, send resource metadata info to remote nodes - if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) { - String filePath = getFileName(resource.getResourcePath(), resource.getResourceId()); - createReplicationJob(ReplicationOperation.REPLICATE, filePath); - } + //if replication enabled, send resource metadata info to remote nodes + if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) { + String filePath = getFileName(resource.getResourcePath(), resource.getResourceId()); + createReplicationJob(ReplicationOperation.REPLICATE, filePath); } } @@ -304,31 +291,12 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } public static LocalResource readLocalResource(File file) throws HyracksDataException { - FileInputStream fis = null; - ObjectInputStream oisFromFis = null; - - try { - fis = new FileInputStream(file); - oisFromFis = new ObjectInputStream(fis); + try (FileInputStream fis = new FileInputStream(file); + ObjectInputStream oisFromFis = new ObjectInputStream(fis)) { LocalResource resource = (LocalResource) oisFromFis.readObject(); return resource; } catch (Exception e) { throw new HyracksDataException(e); - } finally { - if (oisFromFis != null) { - try { - oisFromFis.close(); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } - if (oisFromFis == null && fis != null) { - try { - fis.close(); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } } } @@ -427,4 +395,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } return storageRootDir; } + + /** + * @param partition + * @return The partition local path on this NC. + */ + public String getPartitionPath(int partition) { + //currently each partition is replicated on the same IO device number on all NCs. + return mountPoints[clusterPartitions.get(partition).getIODeviceNum()]; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java index b6bb7dc..e79a0d3 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.transaction.management.resource; +import org.apache.asterix.common.config.AsterixMetadataProperties; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.common.file.ILocalResourceRepository; @@ -26,14 +27,17 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory; public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory { private final IIOManager ioManager; private final String nodeId; + private final AsterixMetadataProperties metadataProperties; - public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId) { + public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId, + AsterixMetadataProperties metadataProperties) { this.ioManager = ioManager; this.nodeId = nodeId; + this.metadataProperties = metadataProperties; } @Override public ILocalResourceRepository createRepository() throws HyracksDataException { - return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId); + return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId, metadataProperties); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java index 701e529..8fe75f2 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java @@ -44,13 +44,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.SortedMap; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.ILocalResourceMetadata; -import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.IAsterixPropertiesProvider; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; @@ -72,14 +70,11 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; -import org.apache.hyracks.control.nc.io.IOManager; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndex; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; @@ -300,7 +295,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //get datasetLifeCycleManager IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() .getDatasetLifecycleManager(); - IIOManager ioManager = appRuntimeContext.getIOManager(); ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository(); Map resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository) .loadAndGetAllResources(); @@ -507,13 +501,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //#. get indexLifeCycleManager IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); - IIOManager ioManager = appRuntimeContext.getIOManager(); - SortedMap clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext - .getAppContext()).getMetadataProperties().getClusterPartitions(); IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager(); - ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository(); - Map resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository) - .loadAndGetAllResources(); + PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext + .getLocalResourceRepository(); + Map resourcesMap = localResourceRepository.loadAndGetAllResources(); //#. set log reader to the lowWaterMarkLsn again. for (int i = 0; i < remoteLogs.size(); i++) { logRecord = remoteLogs.get(i); @@ -561,16 +552,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //get index instance from IndexLifeCycleManager //if index is not registered into IndexLifeCycleManager, //create the index using LocalMetadata stored in LocalResourceRepository - //get the resource path relative to this node - int resourcePartition = localResource.getPartition(); - //get partition io device id - //NOTE: - //currently we store all partition in the same IO device in all nodes. If this changes, - //this needs to be updated to find the IO device in which the partition is stored in this local node. - int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum(); - String resourceAbsolutePath = ioManager - .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile() - .getAbsolutePath(); + //get partition path in this node + String partitionIODevicePath = localResourceRepository + .getPartitionPath(localResource.getPartition()); + String resourceAbsolutePath = partitionIODevicePath + File.separator + + localResource.getResourceName(); localResource.setResourcePath(resourceAbsolutePath); index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath); if (index == null) { @@ -578,8 +564,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject(); index = localResourceMetadata.createIndexInstance(appRuntimeContext, resourceAbsolutePath, localResource.getPartition()); - datasetLifecycleManager.register(localResource.getResourceName(), index); - datasetLifecycleManager.open(localResource.getResourceName()); + datasetLifecycleManager.register(resourceAbsolutePath, index); + datasetLifecycleManager.open(resourceAbsolutePath); //#. get maxDiskLastLSN ILSMIndex lsmIndex = index; @@ -1099,6 +1085,242 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } } + //TODO (mhubail) RecoveryManager has three methods that perform logs REDO based on different parameters. + //They need to be refactored to use partitions only once the log format includes partition id. + @Override + public synchronized void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) + throws IOException, ACIDException { + //delete any recovery files from previous failed recovery attempts + deleteRecoveryTemporaryFiles(); + + int updateLogCount = 0; + int entityCommitLogCount = 0; + int jobCommitLogCount = 0; + int redoCount = 0; + int abortLogCount = 0; + int jobId = -1; + + state = SystemState.RECOVERING; + LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ..."); + + Set winnerJobSet = new HashSet(); + jobId2WinnerEntitiesMap = new HashMap<>(); + + TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); + JobEntityCommits jobEntityWinners = null; + //#. read checkpoint file and set lowWaterMark where anaylsis and redo start + long readableSmallestLSN = logMgr.getReadableSmallestLSN(); + if (lowWaterMarkLSN < readableSmallestLSN) { + lowWaterMarkLSN = readableSmallestLSN; + } + //------------------------------------------------------------------------- + // [ analysis phase ] + // - collect all committed Lsn + //------------------------------------------------------------------------- + LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase"); + IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); + //get datasetLifeCycleManager + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); + PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext + .getLocalResourceRepository(); + Map resourcesMap = localResourceRepository.loadAndGetAllResources(); + Map resourceId2MaxLSNMap = new HashMap(); + + //#. set log reader to the lowWaterMarkLsn + ILogReader logReader = logMgr.getLogReader(true); + ILogRecord logRecord = null; + try { + logReader.initializeScan(lowWaterMarkLSN); + logRecord = logReader.next(); + while (logRecord != null) { + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + //TODO update this partitions once the log format is updated to include partitons + if (logRecord.getNodeId().equals(failedNode)) { + switch (logRecord.getLogType()) { + case LogType.UPDATE: + updateLogCount++; + break; + case LogType.JOB_COMMIT: + jobId = logRecord.getJobId(); + winnerJobSet.add(jobId); + if (jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + //to delete any spilled files as well + jobEntityWinners.clear(); + jobId2WinnerEntitiesMap.remove(jobId); + } + jobCommitLogCount++; + break; + case LogType.ENTITY_COMMIT: + jobId = logRecord.getJobId(); + if (!jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = new JobEntityCommits(jobId); + if (needToFreeMemory()) { + //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk. + //This could happen only when we have many jobs with small number of records and none of them have job commit. + freeJobsCachedEntities(jobId); + } + jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners); + } else { + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + } + jobEntityWinners.add(logRecord); + entityCommitLogCount++; + break; + case LogType.ABORT: + abortLogCount++; + break; + case LogType.FLUSH: + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } + } + logRecord = logReader.next(); + } + + //prepare winners for search after analysis is done to flush anything remaining in memory to disk. + for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) { + winners.prepareForSearch(); + } + //------------------------------------------------------------------------- + // [ redo phase ] + // - redo if + // 1) The TxnId is committed && --> guarantee durability + // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence + //------------------------------------------------------------------------- + LOGGER.info("[RecoveryMgr] in redo phase"); + + long resourceId; + long maxDiskLastLsn; + long LSN = -1; + ILSMIndex index = null; + LocalResource localResource = null; + ILocalResourceMetadata localResourceMetadata = null; + boolean foundWinner = false; + //set log reader to the lowWaterMarkLsn again. + logReader.initializeScan(lowWaterMarkLSN); + logRecord = logReader.next(); + while (logRecord != null) { + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + //TODO update this to check for partitions instead of node id once the log format is updated to include partitions + if (logRecord.getNodeId().equals(failedNode)) { + LSN = logRecord.getLSN(); + jobId = logRecord.getJobId(); + foundWinner = false; + switch (logRecord.getLogType()) { + case LogType.UPDATE: + if (winnerJobSet.contains(jobId)) { + foundWinner = true; + } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize()); + if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) { + foundWinner = true; + } + } + if (foundWinner) { + resourceId = logRecord.getResourceId(); + localResource = resourcesMap.get(resourceId); + /******************************************************************* + * [Notice] + * -> Issue + * Delete index may cause a problem during redo. + * The index operation to be redone couldn't be redone because the corresponding index + * may not exist in NC due to the possible index drop DDL operation. + * -> Approach + * Avoid the problem during redo. + * More specifically, the problem will be detected when the localResource of + * the corresponding index is retrieved, which will end up with 'null'. + * If null is returned, then just go and process the next + * log record. + *******************************************************************/ + if (localResource == null) { + logRecord = logReader.next(); + continue; + } + /*******************************************************************/ + + //get index instance from IndexLifeCycleManager + //if index is not registered into IndexLifeCycleManager, + //create the index using LocalMetadata stored in LocalResourceRepository + //get partition path in this node + String partitionIODevicePath = localResourceRepository + .getPartitionPath(localResource.getPartition()); + String resourceAbsolutePath = partitionIODevicePath + File.separator + + localResource.getResourceName(); + localResource.setResourcePath(resourceAbsolutePath); + index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath); + if (index == null) { + //#. create index instance and register to indexLifeCycleManager + localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject(); + index = localResourceMetadata.createIndexInstance(appRuntimeContext, + resourceAbsolutePath, localResource.getPartition()); + datasetLifecycleManager.register(resourceAbsolutePath, index); + datasetLifecycleManager.open(resourceAbsolutePath); + + //#. get maxDiskLastLSN + ILSMIndex lsmIndex = index; + try { + maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex + .getIOOperationCallback()) + .getComponentLSN(lsmIndex.getImmutableComponents()); + } catch (HyracksDataException e) { + datasetLifecycleManager.close(resourceAbsolutePath); + throw e; + } + + //#. set resourceId and maxDiskLastLSN to the map + resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn); + } else { + maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); + } + + if (LSN > maxDiskLastLsn) { + redo(logRecord, datasetLifecycleManager); + redoCount++; + } + } + break; + case LogType.JOB_COMMIT: + case LogType.ENTITY_COMMIT: + case LogType.ABORT: + case LogType.FLUSH: + //do nothing + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } + } + logRecord = logReader.next(); + } + + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("[RecoveryMgr] recovery is completed."); + LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = " + + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + + "/" + redoCount); + } + } finally { + logReader.close(); + + //close all indexes + Set resourceIdList = resourceId2MaxLSNMap.keySet(); + for (long r : resourceIdList) { + datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath()); + } + + //delete any recovery files after completing recovery + deleteRecoveryTemporaryFiles(); + } + } + private class JobEntityCommits { private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; private final int jobId; @@ -1145,7 +1367,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { /** * Call this method when no more entity commits will be added to this job. - * + * * @throws IOException */ public void prepareForSearch() throws IOException { @@ -1182,7 +1404,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { */ public ArrayList getCandidiatePartitions(long logLSN) { ArrayList candidiatePartitions = new ArrayList(); - for (File partition : jobEntitCommitOnDiskPartitionsFiles) { String partitionName = partition.getName(); //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN