Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3777D98C0 for ; Sat, 4 Feb 2012 02:17:16 +0000 (UTC) Received: (qmail 60652 invoked by uid 500); 4 Feb 2012 02:17:15 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 60557 invoked by uid 500); 4 Feb 2012 02:17:15 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 60550 invoked by uid 99); 4 Feb 2012 02:17:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Feb 2012 02:17:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Feb 2012 02:17:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3935B23888CD for ; Sat, 4 Feb 2012 02:16:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1240440 - in /hadoop/common/branches/branch-1.0: ./ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/server/datanode/ Date: Sat, 04 Feb 2012 02:16:46 -0000 To: common-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120204021647.3935B23888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Sat Feb 4 02:16:46 2012 New Revision: 1240440 URL: http://svn.apache.org/viewvc?rev=1240440&view=rev Log: HDFS-2379. Merging change r1196456 from branch 1 Added: hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java Modified: hadoop/common/branches/branch-1.0/CHANGES.txt hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/branches/branch-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1240440&r1=1240439&r2=1240440&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.0/CHANGES.txt Sat Feb 4 02:16:46 2012 @@ -18,6 +18,9 @@ Release 1.0.1 - 2012.01.30 HADOOP-7470. Move up to Jackson 1.8.8. (Enis Soztutar via szetszwo) + HDFS-2379. Allow block reports to proceed without holding FSDataset lock. + (todd via suresh) + BUG FIXES HADOOP-7960. Port HADOOP-5203 to branch-1, build version comparison is too Modified: hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1240440&r1=1240439&r2=1240440&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Feb 4 02:16:46 2012 @@ -248,7 +248,16 @@ public class DataNode extends Configured public final static String DATA_DIR_PERMISSION_KEY = "dfs.datanode.data.dir.perm"; private static final String DEFAULT_DATA_DIR_PERMISSION = "755"; - + + // Thresholds for when we start to log when a block report is + // taking a long time to generate. Under heavy disk load and + // memory pressure, it's normal for block reports to take + // several minutes, since they cause many disk seeks. + private static final long LATE_BLOCK_REPORT_WARN_THRESHOLD = + 10 * 60 * 1000; // 10m + private static final long LATE_BLOCK_REPORT_INFO_THRESHOLD = + 3 * 60 * 1000; // 3m + // For InterDataNodeProtocol public Server ipcServer; @@ -713,6 +722,8 @@ public class DataNode extends Configured namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten); } // random short delay - helps scatter the BR from all DNs + // - but we can start generating the block report immediately + data.requestAsyncBlockReport(); scheduleBlockReport(initialBlockReportDelay); } @@ -937,42 +948,60 @@ public class DataNode extends Configured // Send latest blockinfo report if timer has expired. if (startTime - lastBlockReport > blockReportInterval) { - - // Create block report - long brCreateStartTime = now(); - Block[] bReport = data.getBlockReport(); - - // Send block report - long brSendStartTime = now(); - DatanodeCommand cmd = namenode.blockReport(dnRegistration, - BlockListAsLongs.convertToArrayLongs(bReport)); - - // Log the block report processing stats from Datanode perspective - long brSendCost = now() - brSendStartTime; - long brCreateCost = brSendStartTime - brCreateStartTime; - myMetrics.addBlockReport(brSendCost); - LOG.info("BlockReport of " + bReport.length - + " blocks took " + brCreateCost + " msec to generate and " - + brSendCost + " msecs for RPC and NN processing"); - - // - // If we have sent the first block report, then wait a random - // time before we start the periodic block reports. - // - if (resetBlockReportTime) { - lastBlockReport = startTime - R.nextInt((int)(blockReportInterval)); - resetBlockReportTime = false; + if (data.isAsyncBlockReportReady()) { + // Create block report + long brCreateStartTime = now(); + Block[] bReport = data.retrieveAsyncBlockReport(); + + // Send block report + long brSendStartTime = now(); + DatanodeCommand cmd = namenode.blockReport(dnRegistration, + BlockListAsLongs.convertToArrayLongs(bReport)); + + // Log the block report processing stats from Datanode perspective + long brSendCost = now() - brSendStartTime; + long brCreateCost = brSendStartTime - brCreateStartTime; + myMetrics.addBlockReport(brSendCost); + LOG.info("BlockReport of " + bReport.length + + " blocks took " + brCreateCost + " msec to generate and " + + brSendCost + " msecs for RPC and NN processing"); + + // + // If we have sent the first block report, then wait a random + // time before we start the periodic block reports. + // + if (resetBlockReportTime) { + lastBlockReport = startTime - + R.nextInt((int)(blockReportInterval)); + resetBlockReportTime = false; + } else { + /* say the last block report was at 8:20:14. The current report + * should have started around 9:20:14 (default 1 hour interval). + * If current time is : + * 1) normal like 9:20:18, next report should be at 10:20:14 + * 2) unexpected like 11:35:43, next report should be at + * 12:20:14 + */ + lastBlockReport += (now() - lastBlockReport) / + blockReportInterval * blockReportInterval; + } + processCommand(cmd); } else { - /* say the last block report was at 8:20:14. The current report - * should have started around 9:20:14 (default 1 hour interval). - * If current time is : - * 1) normal like 9:20:18, next report should be at 10:20:14 - * 2) unexpected like 11:35:43, next report should be at 12:20:14 - */ - lastBlockReport += (now() - lastBlockReport) / - blockReportInterval * blockReportInterval; + data.requestAsyncBlockReport(); + if (lastBlockReport > 0) { // this isn't the first report + long waitingFor = + startTime - lastBlockReport - blockReportInterval; + String msg = "Block report is due, and been waiting for it for " + + (waitingFor/1000) + " seconds..."; + if (waitingFor > LATE_BLOCK_REPORT_WARN_THRESHOLD) { + LOG.warn(msg); + } else if (waitingFor > LATE_BLOCK_REPORT_INFO_THRESHOLD) { + LOG.info(msg); + } else if (LOG.isDebugEnabled()) { + LOG.debug(msg); + } + } } - processCommand(cmd); } // start block scanner Modified: hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1240440&r1=1240439&r2=1240440&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Feb 4 02:16:46 2012 @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeSet; import javax.management.NotCompliantMBeanException; @@ -66,22 +67,23 @@ public class FSDataset implements FSCons /** Find the metadata file for the specified block file. * Return the generation stamp from the name of the metafile. */ - private static long getGenerationStampFromFile(File[] listdir, File blockFile) { - String blockName = blockFile.getName(); + static long getGenerationStampFromFile(File[] listdir, File blockFile) { + String blockNamePrefix = blockFile.getName() + "_"; + // blockNamePrefix is blk_12345_ + // path we're looking for looks like = blk_12345_GENSTAMP.meta + for (int j = 0; j < listdir.length; j++) { String path = listdir[j].getName(); - if (!path.startsWith(blockName)) { - continue; - } - String[] vals = path.split("_"); - if (vals.length != 3) { // blk, blkid, genstamp.meta + if (!path.startsWith(blockNamePrefix)) { continue; } - String[] str = vals[2].split("\\."); - if (str.length != 2) { + if (!path.endsWith(".meta")) { continue; } - return Long.parseLong(str[0]); + + String metaPart = path.substring(blockNamePrefix.length(), + path.length() - METADATA_EXTENSION_LENGTH); + return Long.parseLong(metaPart); } DataNode.LOG.warn("Block " + blockFile + " does not have a metafile!"); @@ -214,30 +216,6 @@ public class FSDataset implements FSCons /** * Populate the given blockSet with any child blocks - * found at this node. - */ - public void getBlockInfo(TreeSet blockSet) { - if (children != null) { - for (int i = 0; i < children.length; i++) { - children[i].getBlockInfo(blockSet); - } - } - - File blockFiles[] = dir.listFiles(); - if (blockFiles != null) { - for (int i = 0; i < blockFiles.length; i++) { - if (Block.isBlockFilename(blockFiles[i])) { - long genStamp = FSDataset.getGenerationStampFromFile(blockFiles, - blockFiles[i]); - blockSet.add(new Block(blockFiles[i], blockFiles[i].length(), - genStamp)); - } - } - } - } - - /** - * Populate the given blockSet with any child blocks * found at this node. With each block, return the full path * of the block file. */ @@ -525,9 +503,43 @@ public class FSDataset implements FSCons DiskChecker.checkDir(tmpDir); DiskChecker.checkDir(blocksBeingWritten); } - - void getBlockInfo(TreeSet blockSet) { - dataDir.getBlockInfo(blockSet); + + void scanBlockFilesInconsistent(Map results) { + scanBlockFilesInconsistent(dataDir.dir, results); + } + + /** + * Recursively scan the given directory, generating a map where + * each key is a discovered block, and the value is the actual + * file for that block. + * + * This is unsynchronized since it can take quite some time + * when inodes and dentries have been paged out of cache. + * After the scan is completed, we reconcile it with + * the current disk state in reconcileRoughBlockScan. + */ + private void scanBlockFilesInconsistent( + File dir, Map results) { + File filesInDir[] = dir.listFiles(); + if (filesInDir != null) { + for (File f : filesInDir) { + if (Block.isBlockFilename(f)) { + long blockLen = f.length(); + if (blockLen == 0 && !f.exists()) { + // length 0 could indicate a race where this file was removed + // while we were in the middle of generating the report. + continue; + } + long genStamp = FSDataset.getGenerationStampFromFile(filesInDir, f); + Block b = new Block(f, blockLen, genStamp); + results.put(b, f); + } else if (f.getName().startsWith("subdir")) { + // the startsWith check is much faster than the + // stat() call invoked by isDirectory() + scanBlockFilesInconsistent(f, results); + } + } + } } void getBlocksBeingWrittenInfo(TreeSet blockSet) { @@ -685,13 +697,20 @@ public class FSDataset implements FSCons } return remaining; } + + void scanBlockFilesInconsistent(Map results) { + // Make a local consistent copy of the volume list, since + // it might change due to a disk failure + FSVolume volumesCopy[]; + synchronized (this) { + volumesCopy = Arrays.copyOf(volumes, volumes.length); + } - synchronized void getBlockInfo(TreeSet blockSet) { - for (int idx = 0; idx < volumes.length; idx++) { - volumes[idx].getBlockInfo(blockSet); + for (FSVolume vol : volumesCopy) { + vol.scanBlockFilesInconsistent(results); } } - + synchronized void getVolumeMap(HashMap volumeMap) { for (int idx = 0; idx < volumes.length; idx++) { volumes[idx].getVolumeMap(volumeMap); @@ -772,6 +791,8 @@ public class FSDataset implements FSCons //Find better place? public static final String METADATA_EXTENSION = ".meta"; + public static final int METADATA_EXTENSION_LENGTH = + METADATA_EXTENSION.length(); public static final short METADATA_VERSION = 1; @@ -926,6 +947,9 @@ public class FSDataset implements FSCons private int validVolsRequired; FSDatasetAsyncDiskService asyncDiskService; + private final AsyncBlockReport asyncBlockReport; + + /** * An FSDataset has a directory where it loads its data files. */ @@ -966,6 +990,8 @@ public class FSDataset implements FSCons } volumes = new FSVolumeSet(volArray); volumes.getVolumeMap(volumeMap); + asyncBlockReport = new AsyncBlockReport(this); + asyncBlockReport.start(); File[] roots = new File[storage.getNumStorageDirs()]; for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { roots[idx] = storage.getStorageDir(idx).getCurrentDir(); @@ -1656,18 +1682,125 @@ public class FSDataset implements FSCons return blockTable; } + @Override + public void requestAsyncBlockReport() { + asyncBlockReport.request(); + } + + @Override + public boolean isAsyncBlockReportReady() { + return asyncBlockReport.isReady(); + } + + @Override + public Block[] retrieveAsyncBlockReport() { + HashMap seenOnDisk = asyncBlockReport.getAndReset(); + return reconcileRoughBlockScan(seenOnDisk); + } + /** - * Return a table of block data + * Return a table of block data. This method is synchronous, and is used + * by tests and during block scanner startup. */ public Block[] getBlockReport() { - TreeSet blockSet = new TreeSet(); - volumes.getBlockInfo(blockSet); - Block blockTable[] = new Block[blockSet.size()]; - int i = 0; - for (Iterator it = blockSet.iterator(); it.hasNext(); i++) { - blockTable[i] = it.next(); + long st = System.currentTimeMillis(); + HashMap seenOnDisk = roughBlockScan(); + // the above results are inconsistent since modifications + // happened concurrently. Now check any diffs + DataNode.LOG.info("Generated rough (lockless) block report in " + + (System.currentTimeMillis() - st) + " ms"); + return reconcileRoughBlockScan(seenOnDisk); + } + + private Block[] reconcileRoughBlockScan(HashMap seenOnDisk) { + Set blockReport; + synchronized (this) { + long st = System.currentTimeMillis(); + // broken out to a static method to simplify testing + reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + DataNode.LOG.info( + "Reconciled asynchronous block report against current state in " + + (System.currentTimeMillis() - st) + " ms"); + + blockReport = seenOnDisk.keySet(); + } + + return blockReport.toArray(new Block[0]); + } + + /** + * Scan the blocks in the dataset on disk, without holding any + * locks. This generates a "rough" block report, since there + * may be concurrent modifications to the disk structure. + */ + HashMap roughBlockScan() { + int expectedNumBlocks; + synchronized (this) { + expectedNumBlocks = volumeMap.size(); + } + HashMap seenOnDisk = + new HashMap(expectedNumBlocks, 1.1f); + volumes.scanBlockFilesInconsistent(seenOnDisk); + return seenOnDisk; + } + + static void reconcileRoughBlockScan( + Map seenOnDisk, + Map volumeMap, + Map ongoingCreates) { + + int numDeletedAfterScan = 0; + int numAddedAfterScan = 0; + int numOngoingIgnored = 0; + + // remove anything seen on disk that's no longer in the memory map, + // or got reopened while we were scanning + Iterator> iter = seenOnDisk.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + Block b = entry.getKey(); + + if (!volumeMap.containsKey(b) || ongoingCreates.containsKey(b)) { + File blockFile = entry.getValue(); + File metaFile = getMetaFile(blockFile, b); + if (!blockFile.exists() || !metaFile.exists()) { + // the block was deleted (or had its generation stamp changed) + // after it was scanned on disk... If the genstamp changed, + // it will be added below when we scan volumeMap + iter.remove(); + numDeletedAfterScan++; + } + } + } + + // add anything from the in-memory map that wasn't seen on disk, + // if and only if the file is now verifiably on disk. + for (Map.Entry entry : volumeMap.entrySet()) { + Block b = entry.getKey(); + if (ongoingCreates.containsKey(b)) { + // don't add these to block reports + numOngoingIgnored++; + continue; + } + DatanodeBlockInfo info = entry.getValue(); + if (!seenOnDisk.containsKey(b) && info.getFile().exists()) { + // add a copy, and use the length from disk instead of from memory + Block toAdd = new Block( + b.getBlockId(), info.getFile().length(), b.getGenerationStamp()); + seenOnDisk.put(toAdd, info.getFile()); + numAddedAfterScan++; + } + // if the file is in memory but _not_ on disk, this is the situation + // in which an administrator accidentally "rm -rf"ed part of a data + // directory. We should _not_ report these blocks. + } + + if (numDeletedAfterScan + numAddedAfterScan + numOngoingIgnored > 0) { + DataNode.LOG.info("Reconciled asynchronous block scan with filesystem. " + + numDeletedAfterScan + " blocks concurrently deleted during scan, " + + numAddedAfterScan + " blocks concurrently added during scan, " + + numOngoingIgnored + " ongoing creations ignored"); } - return blockTable; } /** @@ -1937,6 +2070,10 @@ public class FSDataset implements FSCons asyncDiskService.shutdown(); } + if (asyncBlockReport != null) { + asyncBlockReport.shutdown(); + } + if(volumes != null) { for (FSVolume volume : volumes.volumes) { if(volume != null) { @@ -2031,4 +2168,91 @@ public class FSDataset implements FSCons return info; } } + + /** + * Thread which handles generating "rough" block reports in the background. + * Callers should call request(), and then poll isReady() while the + * work happens. When isReady() returns true, getAndReset() may be + * called to retrieve the results. + */ + static class AsyncBlockReport implements Runnable { + private final Thread thread; + private final FSDataset fsd; + + boolean requested = false; + boolean shouldRun = true; + private HashMap scan = null; + + AsyncBlockReport(FSDataset fsd) { + this.fsd = fsd; + thread = new Thread(this, "Async Block Report Generator"); + thread.setDaemon(true); + } + + void start() { + thread.start(); + } + + synchronized void shutdown() { + shouldRun = false; + thread.interrupt(); + } + + synchronized boolean isReady() { + return scan != null; + } + + synchronized HashMap getAndReset() { + if (!isReady()) { + throw new IllegalStateException("report not ready!"); + } + HashMap ret = scan; + scan = null; + requested = false; + return ret; + } + + synchronized void request() { + requested = true; + notifyAll(); + } + + @Override + public void run() { + while (shouldRun) { + try { + waitForReportRequest(); + assert requested && scan == null; + + DataNode.LOG.info("Starting asynchronous block report scan"); + long st = System.currentTimeMillis(); + HashMap result = fsd.roughBlockScan(); + DataNode.LOG.info("Finished asynchronous block report scan in " + + (System.currentTimeMillis() - st) + "ms"); + + synchronized (this) { + assert scan == null; + this.scan = result; + } + } catch (InterruptedException ie) { + // interrupted to end scanner + } catch (Throwable t) { + DataNode.LOG.error("Async Block Report thread caught exception", t); + try { + // Avoid busy-looping in the case that we have entered some invalid + // state -- don't want to flood the error log with exceptions. + Thread.sleep(2000); + } catch (InterruptedException e) { + } + } + } + } + + private synchronized void waitForReportRequest() + throws InterruptedException { + while (!(requested && scan == null)) { + wait(5000); + } + } + } } Modified: hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1240440&r1=1240439&r2=1240440&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/common/branches/branch-1.0/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sat Feb 4 02:16:46 2012 @@ -238,6 +238,29 @@ public interface FSDatasetInterface exte public Block[] getBlockReport(); /** + * Request that a block report be prepared. + */ + public void requestAsyncBlockReport(); + + /** + * @return true if an asynchronous block report is ready + */ + public boolean isAsyncBlockReportReady(); + + /** + * Retrieve an asynchronously prepared block report. Callers should first + * call {@link #requestAsyncBlockReport()}, and then poll + * {@link #isAsyncBlockReportReady()} until it returns true. + * + * Retrieving the asynchronous block report also resets it; a new + * one must be prepared before this method may be called again. + * + * @throws IllegalStateException if an async report is not ready + */ + public Block[] retrieveAsyncBlockReport(); + + + /** * Returns the blocks being written report * @return - the blocks being written report */ Modified: hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1240440&r1=1240439&r2=1240440&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Feb 4 02:16:46 2012 @@ -308,6 +308,21 @@ public class SimulatedFSDataset impleme } return blockTable; } + + @Override + public void requestAsyncBlockReport() { + } + + @Override + public boolean isAsyncBlockReportReady() { + return true; + } + + @Override + public Block[] retrieveAsyncBlockReport() { + return getBlockReport(); + } + public long getCapacity() throws IOException { return storage.getCapacity(); Added: hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java?rev=1240440&view=auto ============================================================================== --- hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java (added) +++ hadoop/common/branches/branch-1.0/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReportGeneration.java Sat Feb 4 02:16:46 2012 @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.datanode.FSDataset.ActiveFile; +import org.apache.hadoop.hdfs.server.datanode.FSDataset.AsyncBlockReport; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestBlockReportGeneration { + private static final long BLKID = 12345L; + private static final long GENSTAMP = 1000L; + private static final long LEN = 65536L; + + private static final Block FAKE_BLK = + new Block(BLKID, LEN, GENSTAMP); + + + static final File TEST_DIR = new File( + System.getProperty("test.build.data") + File.pathSeparatorChar + + "TestBlockReportGeneration"); + + Map seenOnDisk = new HashMap(); + Map volumeMap = + new HashMap(); + Map ongoingCreates = new HashMap();; + + + @Before + public void cleanupTestDir() throws IOException { + FileUtil.fullyDelete(TEST_DIR); + assertTrue(TEST_DIR.mkdirs()); + } + + @Test + public void testEmpty() { + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + assertTrue(seenOnDisk.isEmpty()); + } + + /** + * Test for case where for some reason there's a block on disk + * that got lost in volumemap - we want to report this. + */ + @Test + public void testOnDiskButNotMemory() { + fakeSeenByScan(FAKE_BLK); + fakeBlockOnDisk(FAKE_BLK); + + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + // should still be in map, since it was seen to still exist on disk + // (exists returns true) + assertTrue(seenOnDisk.containsKey(FAKE_BLK)); + } + + /** + * Test for case where we lost a block from disk - eg a user rm -Rfed + * a data dir accidentally. + */ + @Test + public void testInMemoryButNotOnDisk() { + fakeInVolumeMap(FAKE_BLK); + + assertFalse(seenOnDisk.containsKey(FAKE_BLK)); + assertTrue(volumeMap.containsKey(FAKE_BLK)); + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + // should not be added to the map, since it's truly not on disk + assertFalse(seenOnDisk.containsKey(FAKE_BLK)); + } + + /** + * Test for case where we concurrently removed a block, so it is + * seen in scan, but during reconciliation is on longer on disk. + */ + @Test + public void testRemovedAfterScan() { + fakeSeenByScan(FAKE_BLK); + + assertTrue(seenOnDisk.containsKey(FAKE_BLK)); + assertFalse(volumeMap.containsKey(FAKE_BLK)); + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + // should be removed from the map since .exists() returns false + assertFalse(seenOnDisk.containsKey(FAKE_BLK)); + } + + /** + * Test for case where we concurrently added a block, so it is + * not seen in scan, but is in volumeMap and on disk during + * reconciliation. + */ + @Test + public void testAddedAfterScan() { + fakeInVolumeMap(FAKE_BLK); + fakeBlockOnDisk(FAKE_BLK); + + assertFalse(seenOnDisk.containsKey(FAKE_BLK)); + assertTrue(volumeMap.containsKey(FAKE_BLK)); + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + // should be added, since it's found on disk when reconciling + assertTrue(seenOnDisk.containsKey(FAKE_BLK)); + } + + /** + * Test for case where we concurrently changed the generation stamp + * of a block. So, we scanned it with one GS, but at reconciliation + * time it has a new GS. + */ + @Test + public void testGenstampChangedAfterScan() { + Block oldGenStamp = FAKE_BLK; + Block newGenStamp = new Block(FAKE_BLK); + newGenStamp.setGenerationStamp(GENSTAMP + 1); + + fakeSeenByScan(oldGenStamp); + fakeInVolumeMap(newGenStamp); + fakeBlockOnDisk(newGenStamp); + + assertTrue(seenOnDisk.containsKey(oldGenStamp)); + + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + // old genstamp should not be added + assertFalse(seenOnDisk.containsKey(oldGenStamp)); + // new genstamp should be added, since it's found on disk when reconciling + assertTrue(seenOnDisk.containsKey(newGenStamp)); + } + + @Test + public void testGetGenerationStampFromFile() { + File[] fileList = new File[] { + // include some junk files which should be ignored + new File("blk_-1362850638739812068_5351.meta.foo"), + new File("blk_-1362850638739812068_5351meta"), + // the real dir + new File("."), + new File(".."), + new File("blk_-1362850638739812068"), + new File("blk_-1362850638739812068_5351.meta"), + new File("blk_1453973893701037484"), + new File("blk_1453973893701037484_4804.meta"), + }; + + assertEquals(4804, FSDataset.getGenerationStampFromFile(fileList, + new File("blk_1453973893701037484"))); + // try a prefix of a good block ID + assertEquals(Block.GRANDFATHER_GENERATION_STAMP, + FSDataset.getGenerationStampFromFile(fileList, + new File("blk_145397389370103"))); + + assertEquals(Block.GRANDFATHER_GENERATION_STAMP, + FSDataset.getGenerationStampFromFile(fileList, + new File("blk_99999"))); + + // pass nonsense value + assertEquals(Block.GRANDFATHER_GENERATION_STAMP, + FSDataset.getGenerationStampFromFile(fileList, + new File("blk_"))); + } + + + /** + * Test case for blocks being created - these are not seen by the + * scan since they're in the current/ dir, not bbw/ -- but + * they are in volumeMap and ongoingCreates. These should not + * be reported. + */ + @Test + public void testFileBeingCreated() { + fakeInVolumeMap(FAKE_BLK); + fakeBlockOnDisk(FAKE_BLK); + fakeBeingCreated(FAKE_BLK); + + assertFalse(seenOnDisk.containsKey(FAKE_BLK)); + assertTrue(volumeMap.containsKey(FAKE_BLK)); + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + // should not be added, since it's in the midst of being created! + assertFalse(seenOnDisk.containsKey(FAKE_BLK)); + } + + /** + * Test for case where we reopened a block during the scan + */ + @Test + public void testReopenedDuringScan() { + fakeSeenByScan(FAKE_BLK); + fakeInVolumeMap(FAKE_BLK); + fakeBeingCreated(FAKE_BLK); + + assertTrue(seenOnDisk.containsKey(FAKE_BLK)); + assertTrue(volumeMap.containsKey(FAKE_BLK)); + FSDataset.reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates); + // should be removed from the map since .exists() returns false + assertFalse(seenOnDisk.containsKey(FAKE_BLK)); + } + + @Test(timeout=20000) + public void testAsyncReport() throws Exception { + FSDataset mock = Mockito.mock(FSDataset.class); + AsyncBlockReport abr = new FSDataset.AsyncBlockReport(mock); + abr.start(); + try { + for (int i = 0; i < 3; i++) { + HashMap mockResult = new HashMap(); + Mockito.doReturn(mockResult).when(mock).roughBlockScan(); + + assertFalse(abr.isReady()); + abr.request(); + while (!abr.isReady()) { + Thread.sleep(10); + } + assertSame(mockResult, abr.getAndReset()); + assertFalse(abr.isReady()); + } + } finally { + abr.shutdown(); + } + } + + private void fakeBeingCreated(Block b) { + ongoingCreates.put(b, + new ActiveFile(blockFile(b), new ArrayList())); + } + + private void fakeInVolumeMap(Block b) { + volumeMap.put(b, new DatanodeBlockInfo(null, blockFile(b))); + } + + private void fakeBlockOnDisk(Block b) { + File f = blockFile(b); + try { + f.createNewFile(); + FSDataset.getMetaFile(f, b).createNewFile(); + } catch (IOException e) { + throw new RuntimeException("Could not create: " + f); + } + } + + private void fakeSeenByScan(Block b) { + seenOnDisk.put(b, blockFile(b)); + } + + private File blockFile(Block b) { + return new File(TEST_DIR, b.getBlockName()); + } +}