Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D45459F94 for ; Sat, 10 Mar 2012 01:52:57 +0000 (UTC) Received: (qmail 21641 invoked by uid 500); 10 Mar 2012 01:52:57 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 21416 invoked by uid 500); 10 Mar 2012 01:52:54 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 21384 invoked by uid 99); 10 Mar 2012 01:52:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2012 01:52:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2012 01:52:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D693E2388865; Sat, 10 Mar 2012 01:52:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1299139 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/server/datanode/ Date: Sat, 10 Mar 2012 01:52:18 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120310015218.D693E2388865@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Sat Mar 10 01:52:17 2012 New Revision: 1299139 URL: http://svn.apache.org/viewvc?rev=1299139&view=rev Log: HDFS-3056. Add a new interface RollingLogs for DataBlockScanner logging. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Mar 10 01:52:17 2012 @@ -455,8 +455,6 @@ Release 0.23.3 - UNRELEASED HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple storages. (suresh) - HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo) - HDFS-2430. The number of failed or low-resource volumes the NN can tolerate should be configurable. (atm) @@ -533,6 +531,11 @@ Release 0.23.3 - UNRELEASED HDFS-3060. Bump TestDistributedUpgrade#testDistributedUpgrade timeout (eli) + HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo) + + HDFS-3056. Add a new interface RollingLogs for DataBlockScanner logging. + (szetszwo) + OPTIMIZATIONS HDFS-3024. Improve performance of stringification in addStoredBlock (todd) Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Sat Mar 10 01:52:17 2012 @@ -18,15 +18,9 @@ package org.apache.hadoop.hdfs.server.datanode; -import java.io.BufferedReader; -import java.io.Closeable; import java.io.DataOutputStream; -import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FileReader; import java.io.IOException; -import java.io.PrintStream; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Collections; @@ -34,7 +28,11 @@ import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -61,41 +59,43 @@ class BlockPoolSliceScanner { public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class); + private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS"; + private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec - - static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks + private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks + + private static final String VERIFICATION_PREFIX = "dncp_block_verification.log"; + private final String blockPoolId; - - private static final String dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS"; - - static final String verificationLogFile = "dncp_block_verification.log"; - static final int verficationLogLimit = 5; // * numBlocks. + private final long scanPeriod; + private final AtomicLong lastScanTime = new AtomicLong(); - private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000; - private DataNode datanode; + private final DataNode datanode; private final FSDatasetInterface dataset; - // sorted set - private TreeSet blockInfoSet; - private HashMap blockMap; + private final SortedSet blockInfoSet + = new TreeSet(); + private final Map blockMap + = new HashMap(); // processedBlocks keeps track of which blocks are scanned // since the last run. - private HashMap processedBlocks; + private volatile HashMap processedBlocks; private long totalScans = 0; private long totalScanErrors = 0; private long totalTransientErrors = 0; - private long totalBlocksScannedInLastRun = 0; // Used for test only + private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only private long currentPeriodStart = System.currentTimeMillis(); private long bytesLeft = 0; // Bytes to scan in this period private long totalBytesToScan = 0; - private LogFileHandler verificationLog; + private final LogFileHandler verificationLog; - private DataTransferThrottler throttler = null; + private final DataTransferThrottler throttler = new DataTransferThrottler( + 200, MAX_SCAN_RATE); private static enum ScanType { VERIFICATION_SCAN, // scanned as part of periodic verfication @@ -133,29 +133,48 @@ class BlockPoolSliceScanner { } } - BlockPoolSliceScanner(DataNode datanode, + BlockPoolSliceScanner(String bpid, DataNode datanode, FSDatasetInterface dataset, - Configuration conf, String bpid) { + Configuration conf) { this.datanode = datanode; this.dataset = dataset; this.blockPoolId = bpid; - scanPeriod = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, + + long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT); - if ( scanPeriod <= 0 ) { - scanPeriod = DEFAULT_SCAN_PERIOD_HOURS; + if (hours <= 0) { + hours = DEFAULT_SCAN_PERIOD_HOURS; } - scanPeriod *= 3600 * 1000; - LOG.info("Periodic Block Verification scan initialized with interval " + scanPeriod + "."); + this.scanPeriod = hours * 3600 * 1000; + LOG.info("Periodic Block Verification Scanner initialized with interval " + + hours + " hours for block pool " + bpid + "."); + + // get the list of blocks and arrange them in random order + List arr = dataset.getFinalizedBlocks(blockPoolId); + Collections.shuffle(arr); + + long scanTime = -1; + for (Block block : arr) { + BlockScanInfo info = new BlockScanInfo( block ); + info.lastScanTime = scanTime--; + //still keep 'info.lastScanType' to NONE. + addBlockInfo(info); + } + + RollingLogs rollingLogs = null; + try { + rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX); + } catch (IOException e) { + LOG.warn("Could not open verfication log. " + + "Verification times are not stored."); + } + verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs); } String getBlockPoolId() { return blockPoolId; } - synchronized boolean isInitialized() { - return throttler != null; - } - private void updateBytesToScan(long len, long lastScanTime) { // len could be negative when a block is deleted. totalBytesToScan += len; @@ -197,51 +216,6 @@ class BlockPoolSliceScanner { } } - void init() throws IOException { - // get the list of blocks and arrange them in random order - List arr = dataset.getFinalizedBlocks(blockPoolId); - Collections.shuffle(arr); - - blockInfoSet = new TreeSet(); - blockMap = new HashMap(); - - long scanTime = -1; - for (Block block : arr) { - BlockScanInfo info = new BlockScanInfo( block ); - info.lastScanTime = scanTime--; - //still keep 'info.lastScanType' to NONE. - addBlockInfo(info); - } - - /* Pick the first directory that has any existing scanner log. - * otherwise, pick the first directory. - */ - File dir = null; - final List volumes = dataset.getVolumes(); - for (FSVolumeInterface vol : volumes) { - File bpDir = vol.getDirectory(blockPoolId); - if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) { - dir = bpDir; - break; - } - } - if (dir == null) { - dir = volumes.get(0).getDirectory(blockPoolId); - } - - try { - // max lines will be updated later during initialization. - verificationLog = new LogFileHandler(dir, verificationLogFile, 100); - } catch (IOException e) { - LOG.warn("Could not open verfication log. " + - "Verification times are not stored."); - } - - synchronized (this) { - throttler = new DataTransferThrottler(200, MAX_SCAN_RATE); - } - } - private synchronized long getNewBlockScanTime() { /* If there are a lot of blocks, this returns a random time with in * the scan period. Otherwise something sooner. @@ -255,10 +229,6 @@ class BlockPoolSliceScanner { /** Adds block to list of blocks */ synchronized void addBlock(ExtendedBlock block) { - if (!isInitialized()) { - return; - } - BlockScanInfo info = blockMap.get(block.getLocalBlock()); if ( info != null ) { LOG.warn("Adding an already existing block " + block); @@ -274,20 +244,19 @@ class BlockPoolSliceScanner { /** Deletes the block from internal structures */ synchronized void deleteBlock(Block block) { - if (!isInitialized()) { - return; - } BlockScanInfo info = blockMap.get(block); if ( info != null ) { delBlockInfo(info); } } - /** @return the last scan time */ + /** @return the last scan time for the block pool. */ + long getLastScanTime() { + return lastScanTime.get(); + } + + /** @return the last scan time the given block. */ synchronized long getLastScanTime(Block block) { - if (!isInitialized()) { - return 0; - } BlockScanInfo info = blockMap.get(block); return info == null? 0: info.lastScanTime; } @@ -302,9 +271,6 @@ class BlockPoolSliceScanner { private synchronized void updateScanStatus(Block block, ScanType type, boolean scanOk) { - if (!isInitialized()) { - return; - } BlockScanInfo info = blockMap.get(block); if ( info != null ) { @@ -325,9 +291,9 @@ class BlockPoolSliceScanner { return; } - LogFileHandler log = verificationLog; - if (log != null) { - log.appendLine(now, block.getGenerationStamp(), block.getBlockId()); + if (verificationLog != null) { + verificationLog.append(now, block.getGenerationStamp(), + block.getBlockId()); } } @@ -342,6 +308,7 @@ class BlockPoolSliceScanner { } static private class LogEntry { + long blockId = -1; long verificationTime = -1; long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP; @@ -355,6 +322,14 @@ class BlockPoolSliceScanner { private static Pattern entryPattern = Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*"); + static String toString(long verificationTime, long genStamp, long blockId, + DateFormat dateFormat) { + return "\ndate=\"" + dateFormat.format(new Date(verificationTime)) + + "\"\t time=\"" + verificationTime + + "\"\t genstamp=\"" + genStamp + + "\"\t id=\"" + blockId + "\""; + } + static LogEntry parseEntry(String line) { LogEntry entry = new LogEntry(); @@ -491,8 +466,8 @@ class BlockPoolSliceScanner { } // Used for tests only - long getBlocksScannedInLastRun() { - return totalBlocksScannedInLastRun; + int getBlocksScannedInLastRun() { + return totalBlocksScannedInLastRun.get(); } /** @@ -503,33 +478,19 @@ class BlockPoolSliceScanner { * to exit. */ private boolean assignInitialVerificationTimes() { - int numBlocks = 1; - LogFileHandler log = null; - synchronized (this) { - log = verificationLog; - numBlocks = Math.max(blockMap.size(), 1); - } - - long now = System.currentTimeMillis(); - LogFileHandler.Reader logReader[] = new LogFileHandler.Reader[2]; - try { - if (log != null) { - logReader[0] = log.getCurrentFileReader(); - logReader[1] = log.getPreviousFileReader(); - } - } catch (IOException e) { - LOG.warn("Could not read previous verification times", e); - } - - try { - for (LogFileHandler.Reader reader : logReader) { - // update verification times from the verificationLog. - while (logReader != null && reader.hasNext()) { + //First updates the last verification times from the log file. + if (verificationLog != null) { + long now = System.currentTimeMillis(); + RollingLogs.LineIterator logIterator = null; + try { + logIterator = verificationLog.logs.iterator(false); + // update verification times from the verificationLog. + while (logIterator.hasNext()) { if (!datanode.shouldRun || datanode.blockScanner.blockScannerThread.isInterrupted()) { return false; } - LogEntry entry = LogEntry.parseEntry(reader.next()); + LogEntry entry = LogEntry.parseEntry(logIterator.next()); if (entry != null) { updateBlockInfo(entry); if (now - entry.verificationTime < scanPeriod) { @@ -540,35 +501,35 @@ class BlockPoolSliceScanner { updateBytesLeft(-info.block.getNumBytes()); processedBlocks.put(entry.blockId, 1); } - if (reader.file == log.prevFile) { + if (logIterator.isPrevious()) { // write the log entry to current file // so that the entry is preserved for later runs. - log.appendLine(entry.verificationTime, entry.genStamp, + verificationLog.append(entry.verificationTime, entry.genStamp, entry.blockId); } } } } } + } catch (IOException e) { + LOG.warn("Failed to read previous verification times.", e); + } finally { + IOUtils.closeStream(logIterator); } - } finally { - IOUtils.closeStream(logReader[0]); - IOUtils.closeStream(logReader[1]); } - /* Initially spread the block reads over half of - * MIN_SCAN_PERIOD so that we don't keep scanning the - * blocks too quickly when restarted. - */ - long verifyInterval = (long) (Math.min( scanPeriod/2.0/numBlocks, - 10*60*1000 )); - long lastScanTime = System.currentTimeMillis() - scanPeriod; /* Before this loop, entries in blockInfoSet that are not * updated above have lastScanTime of <= 0 . Loop until first entry has * lastModificationTime > 0. */ synchronized (this) { + final int numBlocks = Math.max(blockMap.size(), 1); + // Initially spread the block reads over half of scan period + // so that we don't keep scanning the blocks too quickly when restarted. + long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L); + long lastScanTime = System.currentTimeMillis() - scanPeriod; + if (!blockInfoSet.isEmpty()) { BlockScanInfo info; while ((info = blockInfoSet.first()).lastScanTime < 0) { @@ -586,11 +547,6 @@ class BlockPoolSliceScanner { private synchronized void updateBytesLeft(long len) { bytesLeft += len; } - - static File getCurrentFile(FSVolumeInterface vol, String bpid) throws IOException { - return LogFileHandler.getCurrentFile(vol.getDirectory(bpid), - BlockPoolSliceScanner.verificationLogFile); - } private synchronized void startNewPeriod() { LOG.info("Starting a new period : work left in prev period : " @@ -604,26 +560,21 @@ class BlockPoolSliceScanner { void scanBlockPoolSlice() { startNewPeriod(); - if (processedBlocks != null) { - totalBlocksScannedInLastRun = processedBlocks.size(); - } // Create a new processedBlocks structure processedBlocks = new HashMap(); - if (verificationLog != null) { - try { - verificationLog.openCurFile(); - } catch (FileNotFoundException ex) { - LOG.warn("Could not open current file"); - } - } if (!assignInitialVerificationTimes()) { return; } // Start scanning - scan(); + try { + scan(); + } finally { + totalBlocksScannedInLastRun.set(processedBlocks.size()); + lastScanTime.set(System.currentTimeMillis()); + } } - public void scan() { + private void scan() { if (LOG.isDebugEnabled()) { LOG.debug("Starting to scan blockpool: " + blockPoolId); } @@ -663,7 +614,7 @@ class BlockPoolSliceScanner { private synchronized void cleanUp() { if (verificationLog != null) { try { - verificationLog.roll(); + verificationLog.logs.roll(); } catch (IOException ex) { LOG.warn("Received exception: ", ex); verificationLog.close(); @@ -686,7 +637,7 @@ class BlockPoolSliceScanner { int inScanPeriod = 0; int neverScanned = 0; - DateFormat dateFormat = new SimpleDateFormat(dateFormatString); + DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT); int total = blockInfoSet.size(); @@ -751,191 +702,33 @@ class BlockPoolSliceScanner { /** * This class takes care of log file used to store the last verification - * times of the blocks. It rolls the current file when it is too big etc. - * If there is an error while writing, it stops updating with an error - * message. + * times of the blocks. */ private static class LogFileHandler { - - private static final String curFileSuffix = ".curr"; - private static final String prevFileSuffix = ".prev"; - private final DateFormat dateFormat = new SimpleDateFormat(dateFormatString); - - static File getCurrentFile(File dir, String filePrefix) { - return new File(dir, filePrefix + curFileSuffix); - } - - public Reader getPreviousFileReader() throws IOException { - return new Reader(prevFile); - } - - public Reader getCurrentFileReader() throws IOException { - return new Reader(curFile); - } - - static boolean isFilePresent(File dir, String filePrefix) { - return new File(dir, filePrefix + curFileSuffix).exists() || - new File(dir, filePrefix + prevFileSuffix).exists(); - } - private File curFile; - private File prevFile; - - private PrintStream out; - - /** - * Opens the log file for appending. - * Note that rolling will happen only after "updateLineCount()" is - * called. This is so that line count could be updated in a separate - * thread without delaying start up. - * - * @param dir where the logs files are located. - * @param filePrefix prefix of the file. - * @param maxNumLines max lines in a file (its a soft limit). - * @throws IOException - */ - LogFileHandler(File dir, String filePrefix, int maxNumLines) - throws IOException { - curFile = new File(dir, filePrefix + curFileSuffix); - prevFile = new File(dir, filePrefix + prevFileSuffix); - } - - /** - * Append "\n" + line. - * If the log file need to be rolled, it will done after - * appending the text. - * This does not throw IOException when there is an error while - * appending. Currently does not throw an error even if rolling - * fails (may be it should?). - * return true if append was successful. - */ - synchronized boolean appendLine(String line) { - if (out == null) { - return false; - } - out.println(); - out.print(line); - return true; - } - - boolean appendLine(long verificationTime, long genStamp, long blockId) { - return appendLine("date=\"" - + dateFormat.format(new Date(verificationTime)) + "\"\t " + "time=\"" - + verificationTime + "\"\t " + "genstamp=\"" + genStamp + "\"\t " - + "id=\"" + blockId + "\""); - } - - private synchronized void openCurFile() throws FileNotFoundException { - close(); - out = new PrintStream(new FileOutputStream(curFile, true)); - } - - private void roll() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Rolling current file: " + curFile.getAbsolutePath() - + " to previous file: " + prevFile.getAbsolutePath()); - } + private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT); - if (!prevFile.delete() && prevFile.exists()) { - throw new IOException("Could not delete " + prevFile); - } - - close(); + private final RollingLogs logs; - if (!curFile.renameTo(prevFile)) { - throw new IOException("Could not rename " + curFile + - " to " + prevFile); - } - } - - synchronized void close() { - if (out != null) { - out.close(); - out = null; - } + private LogFileHandler(RollingLogs logs) { + this.logs = logs; } - - /** - * This is used to read the lines in order. - * If the data is not read completely (i.e, untill hasNext() returns - * false), it needs to be explicitly - */ - private static class Reader implements Iterator, Closeable { - - BufferedReader reader; - File file; - String line; - boolean closed = false; - - private Reader(File file) throws IOException { - reader = null; - this.file = file; - readNext(); - } - - private boolean openFile() throws IOException { - if (file == null) { - return false; - } - if (reader != null ) { - reader.close(); - reader = null; - } - if (file.exists()) { - reader = new BufferedReader(new FileReader(file)); - return true; - } else { - return false; - } - } - - // read next line if possible. - private void readNext() throws IOException { - line = null; - if (reader == null) { - openFile(); - } - try { - if (reader != null && (line = reader.readLine()) != null) { - return; - } - } finally { - if (!hasNext()) { - close(); - } - } - } - - public boolean hasNext() { - return line != null; - } - - public String next() { - String curLine = line; - try { - readNext(); - } catch (IOException e) { - LOG.info("Could not read next line in LogHandler", e); - } - return curLine; - } - public void remove() { - throw new RuntimeException("remove() is not supported."); + void append(long verificationTime, long genStamp, long blockId) { + final String m = LogEntry.toString(verificationTime, genStamp, blockId, + dateFormat); + try { + logs.appender().append(m); + } catch (IOException e) { + LOG.warn("Failed to append to " + logs + ", m=" + m, e); } + } - public void close() throws IOException { - if (!closed) { - try { - if (reader != null) { - reader.close(); - } - } finally { - file = null; - reader = null; - closed = true; - } - } + void close() { + try { + logs.appender().close(); + } catch (IOException e) { + LOG.warn("Failed to close the appender of " + logs, e); } - } + } } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Sat Mar 10 01:52:17 2012 @@ -18,9 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; -import java.io.File; import java.io.IOException; -import java.util.Iterator; import java.util.TreeMap; import javax.servlet.http.HttpServlet; @@ -132,24 +130,14 @@ public class DataBlockScanner implements waitForInit(currentBpId); synchronized (this) { if (getBlockPoolSetSize() > 0) { - // Find nextBpId by finding the last modified current log file, if any - long lastScanTime = -1; - Iterator bpidIterator = blockPoolScannerMap.keySet() - .iterator(); - while (bpidIterator.hasNext()) { - String bpid = bpidIterator.next(); - for (FSDatasetInterface.FSVolumeInterface vol : dataset.getVolumes()) { - try { - File currFile = BlockPoolSliceScanner.getCurrentFile(vol, bpid); - if (currFile.exists()) { - long lastModified = currFile.lastModified(); - if (lastScanTime < lastModified) { - lastScanTime = lastModified; - nextBpId = bpid; - } - } - } catch (IOException e) { - LOG.warn("Received exception: ", e); + // Find nextBpId by the minimum of the last scan time + long lastScanTime = 0; + for (String bpid : blockPoolScannerMap.keySet()) { + final long t = getBPScanner(bpid).getLastScanTime(); + if (t != 0L) { + if (bpid == null || t < lastScanTime) { + lastScanTime = t; + nextBpId = bpid; } } } @@ -157,13 +145,9 @@ public class DataBlockScanner implements // nextBpId can still be null if no current log is found, // find nextBpId sequentially. if (nextBpId == null) { - if ("".equals(currentBpId)) { + nextBpId = blockPoolScannerMap.higherKey(currentBpId); + if (nextBpId == null) { nextBpId = blockPoolScannerMap.firstKey(); - } else { - nextBpId = blockPoolScannerMap.higherKey(currentBpId); - if (nextBpId == null) { - nextBpId = blockPoolScannerMap.firstKey(); - } } } if (nextBpId != null) { @@ -206,12 +190,8 @@ public class DataBlockScanner implements } } - public synchronized boolean isInitialized(String bpid) { - BlockPoolSliceScanner bpScanner = getBPScanner(bpid); - if (bpScanner != null) { - return bpScanner.isInitialized(); - } - return false; + boolean isInitialized(String bpid) { + return getBPScanner(bpid) != null; } public synchronized void printBlockReport(StringBuilder buffer, @@ -260,14 +240,8 @@ public class DataBlockScanner implements if (blockPoolScannerMap.get(blockPoolId) != null) { return; } - BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(datanode, dataset, - conf, blockPoolId); - try { - bpScanner.init(); - } catch (IOException ex) { - LOG.warn("Failed to initialized block scanner for pool id="+blockPoolId); - return; - } + BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId, + datanode, dataset, conf); blockPoolScannerMap.put(blockPoolId, bpScanner); LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size=" + blockPoolScannerMap.size()); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Mar 10 01:52:17 2012 @@ -85,7 +85,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; @@ -96,7 +95,6 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; @@ -165,7 +163,6 @@ import org.apache.hadoop.util.VersionInf import org.mortbay.util.ajax.JSON; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import com.google.protobuf.BlockingService; @@ -253,7 +250,7 @@ public class DataNode extends Configured boolean isBlockTokenEnabled; BlockPoolTokenSecretManager blockPoolTokenSecretManager; - public DataBlockScanner blockScanner = null; + volatile DataBlockScanner blockScanner = null; private DirectoryScanner directoryScanner = null; /** Activated plug-ins. */ Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Mar 10 01:52:17 2012 @@ -18,13 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; +import java.io.BufferedReader; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; +import java.io.FileReader; import java.io.IOException; import java.io.InputStream; +import java.io.PrintStream; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.ArrayList; @@ -37,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -612,8 +616,8 @@ class FSDataset implements FSDatasetInte } @Override - public File getDirectory(String bpid) throws IOException { - return getBlockPoolSlice(bpid).getDirectory(); + public String getPath(String bpid) throws IOException { + return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath(); } @Override @@ -2301,7 +2305,7 @@ class FSDataset implements FSDatasetInte DataNode.LOG.warn("Metadata file in memory " + memMetaFile.getAbsolutePath() + " does not match file found by scan " - + diskMetaFile.getAbsolutePath()); + + (diskMetaFile == null? null: diskMetaFile.getAbsolutePath())); } } else { // Metadata file corresponding to block in memory is missing @@ -2612,4 +2616,220 @@ class FSDataset implements FSDatasetInte datafile.getAbsolutePath(), metafile.getAbsolutePath()); return info; } + + @Override + public RollingLogs createRollingLogs(String bpid, String prefix + ) throws IOException { + String dir = null; + final List volumes = getVolumes(); + for (FSVolume vol : volumes) { + String bpDir = vol.getPath(bpid); + if (RollingLogsImpl.isFilePresent(bpDir, prefix)) { + dir = bpDir; + break; + } + } + if (dir == null) { + dir = volumes.get(0).getPath(bpid); + } + return new RollingLogsImpl(dir, prefix); + } + + static class RollingLogsImpl implements RollingLogs { + private static final String CURR_SUFFIX = ".curr"; + private static final String PREV_SUFFIX = ".prev"; + + static boolean isFilePresent(String dir, String filePrefix) { + return new File(dir, filePrefix + CURR_SUFFIX).exists() || + new File(dir, filePrefix + PREV_SUFFIX).exists(); + } + + private final File curr; + private final File prev; + private PrintStream out; //require synchronized access + + private Appender appender = new Appender() { + @Override + public Appendable append(CharSequence csq) { + synchronized(RollingLogsImpl.this) { + if (out == null) { + throw new IllegalStateException(RollingLogsImpl.this + + " is not yet opened."); + } + out.print(csq); + } + return this; + } + + @Override + public Appendable append(char c) { + throw new UnsupportedOperationException(); + } + + @Override + public Appendable append(CharSequence csq, int start, int end) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + synchronized(RollingLogsImpl.this) { + if (out != null) { + out.close(); + out = null; + } + } + } + }; + + + private final AtomicInteger numReaders = new AtomicInteger(); + + private RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{ + curr = new File(dir, filePrefix + CURR_SUFFIX); + prev = new File(dir, filePrefix + PREV_SUFFIX); + out = new PrintStream(new FileOutputStream(curr, true)); + } + + @Override + public Reader iterator(boolean skipPrevFile) throws IOException { + numReaders.incrementAndGet(); + return new Reader(skipPrevFile); + } + + @Override + public Appender appender() { + return appender; + } + + @Override + public boolean roll() throws IOException { + if (numReaders.get() > 0) { + return false; + } + if (!prev.delete() && prev.exists()) { + throw new IOException("Failed to delete " + prev); + } + + synchronized(this) { + appender.close(); + final boolean renamed = curr.renameTo(prev); + out = new PrintStream(new FileOutputStream(curr, true)); + if (!renamed) { + throw new IOException("Failed to rename " + curr + " to " + prev); + } + } + return true; + } + + @Override + public String toString() { + return curr.toString(); + } + + /** + * This is used to read the lines in order. + * If the data is not read completely (i.e, untill hasNext() returns + * false), it needs to be explicitly + */ + private class Reader implements RollingLogs.LineIterator { + private File file; + private BufferedReader reader; + private String line; + private boolean closed = false; + + private Reader(boolean skipPrevFile) throws IOException { + reader = null; + file = skipPrevFile? curr : prev; + readNext(); + } + + @Override + public boolean isPrevious() { + return file == prev; + } + + private boolean openFile() throws IOException { + + for(int i=0; i<2; i++) { + if (reader != null || i > 0) { + // move to next file + file = isPrevious()? curr : null; + } + if (file == null) { + return false; + } + if (file.exists()) { + break; + } + } + + if (reader != null ) { + reader.close(); + reader = null; + } + + reader = new BufferedReader(new FileReader(file)); + return true; + } + + // read next line if possible. + private void readNext() throws IOException { + line = null; + try { + if (reader != null && (line = reader.readLine()) != null) { + return; + } + if (line == null) { + // move to the next file. + if (openFile()) { + readNext(); + } + } + } finally { + if (!hasNext()) { + close(); + } + } + } + + @Override + public boolean hasNext() { + return line != null; + } + + @Override + public String next() { + String curLine = line; + try { + readNext(); + } catch (IOException e) { + DataBlockScanner.LOG.warn("Failed to read next line.", e); + } + return curLine; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + if (!closed) { + try { + if (reader != null) { + reader.close(); + } + } finally { + file = null; + reader = null; + closed = true; + final int n = numReaders.decrementAndGet(); + assert(n >= 0); + } + } + } + } + } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sat Mar 10 01:52:17 2012 @@ -88,13 +88,21 @@ public interface FSDatasetInterface getVolumes(); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Sat Mar 10 01:52:17 2012 @@ -18,27 +18,28 @@ package org.apache.hadoop.hdfs; +import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.URL; +import java.util.Random; import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.io.*; -import java.util.Random; + +import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; -import junit.framework.TestCase; - /** * This test verifies that block verification occurs on the datanode */ @@ -392,7 +393,7 @@ public class TestDatanodeBlockScanner ex } private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex, - long timeout) throws IOException, TimeoutException, InterruptedException { + long timeout) throws TimeoutException, InterruptedException { File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk); long failtime = System.currentTimeMillis() + ((timeout > 0) ? timeout : Long.MAX_VALUE); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1299139&r1=1299138&r2=1299139&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Mar 10 01:52:17 2012 @@ -435,7 +435,7 @@ public class SimulatedFSDataset } @Override // FSDatasetInterface - public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { + public synchronized void unfinalizeBlock(ExtendedBlock b) { if (isValidRbw(b)) { blockMap.remove(b.getLocalBlock()); } @@ -456,12 +456,12 @@ public class SimulatedFSDataset } @Override // FSDatasetMBean - public long getCapacity() throws IOException { + public long getCapacity() { return storage.getCapacity(); } @Override // FSDatasetMBean - public long getDfsUsed() throws IOException { + public long getDfsUsed() { return storage.getUsed(); } @@ -471,7 +471,7 @@ public class SimulatedFSDataset } @Override // FSDatasetMBean - public long getRemaining() throws IOException { + public long getRemaining() { return storage.getFree(); } @@ -938,13 +938,13 @@ public class SimulatedFSDataset @Override // FSDatasetInterface public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, - long newlength) throws IOException { + long newlength) { return new FinalizedReplica( oldBlock.getBlockId(), newlength, recoveryId, null, null); } @Override // FSDatasetInterface - public long getReplicaVisibleLength(ExtendedBlock block) throws IOException { + public long getReplicaVisibleLength(ExtendedBlock block) { return block.getNumBytes(); } @@ -1013,4 +1013,9 @@ public class SimulatedFSDataset public Map getVolumeInfoMap() { throw new UnsupportedOperationException(); } + + @Override + public RollingLogs createRollingLogs(String bpid, String prefix) { + throw new UnsupportedOperationException(); + } }