Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 78024 invoked from network); 10 Nov 2009 07:39:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 10 Nov 2009 07:39:46 -0000 Received: (qmail 37499 invoked by uid 500); 10 Nov 2009 07:39:46 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 37448 invoked by uid 500); 10 Nov 2009 07:39:46 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 37431 invoked by uid 99); 10 Nov 2009 07:39:45 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2009 07:39:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2009 07:39:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0101F2388904; Tue, 10 Nov 2009 07:39:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r834377 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ Date: Tue, 10 Nov 2009 07:39:14 -0000 To: hdfs-commits@hadoop.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091110073915.0101F2388904@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dhruba Date: Tue Nov 10 07:39:14 2009 New Revision: 834377 URL: http://svn.apache.org/viewvc?rev=834377&view=rev Log: HDFS-611. Prevent DataNode heartbeat times from increasing even when the DataNode has many blocks to delete. (Zheng Shao via dhruba) Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=834377&r1=834376&r2=834377&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 10 07:39:14 2009 @@ -46,6 +46,9 @@ HDFS-757. Enable Unit test for HDFS Raid. (dhruba) + HDFS-611. Prevent DataNode heartbeat times from increasing even when + the DataNode has many blocks to delete. (Zheng Shao via dhruba) + Release 0.21.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=834377&r1=834376&r2=834377&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Tue Nov 10 07:39:14 2009 @@ -28,7 +28,7 @@ public static int MIN_BLOCKS_FOR_WRITE = 5; // Chunk the block Invalidate message - public static final int BLOCK_INVALIDATE_CHUNK = 100; + public static final int BLOCK_INVALIDATE_CHUNK = 1000; // Long that indicates "leave current quota unchanged" public static final long QUOTA_DONT_SET = Long.MAX_VALUE; Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=834377&r1=834376&r2=834377&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Nov 10 07:39:14 2009 @@ -294,6 +294,7 @@ } class FSVolume { + private File currentDir; private FSDir dataDir; // directory store Finalized replica private File rbwDir; // directory store RBW replica private File tmpDir; // directory store Temporary replica @@ -304,6 +305,7 @@ FSVolume(File currentDir, Configuration conf) throws IOException { this.reserved = conf.getLong("dfs.datanode.du.reserved", 0); + this.currentDir = currentDir; File parent = currentDir.getParentFile(); final File finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); @@ -337,8 +339,16 @@ this.dfsUsage.start(); } + File getCurrentDir() { + return currentDir; + } + void decDfsUsed(long value) { - dfsUsage.decDfsUsed(value); + // The caller to this method (BlockFileDeleteTask.run()) does + // not have locked FSDataset.this yet. + synchronized(FSDataset.this) { + dfsUsage.decDfsUsed(value); + } } long getDfsUsed() throws IOException { @@ -822,6 +832,7 @@ private int maxBlocksPerDir = 0; ReplicasMap volumeMap = new ReplicasMap(); static Random random = new Random(); + FSDatasetAsyncDiskService asyncDiskService; // Used for synchronizing access to usage stats private Object statsLock = new Object(); @@ -840,6 +851,11 @@ } volumes = new FSVolumeSet(volArray); volumes.getVolumeMap(volumeMap); + File[] roots = new File[storage.getNumStorageDirs()]; + for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { + roots[idx] = storage.getStorageDir(idx).getCurrentDir(); + } + asyncDiskService = new FSDatasetAsyncDiskService(roots); registerMBean(storage.getStorageID()); } @@ -1603,22 +1619,10 @@ volumeMap.remove(invalidBlks[i]); } File metaFile = getMetaFile( f, invalidBlks[i] ); - long blockSize = f.length()+metaFile.length(); - if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) { - DataNode.LOG.warn("Unexpected error trying to delete block " - + invalidBlks[i] + " at file " + f); - error = true; - continue; - } - v.decDfsUsed(blockSize); - DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f); - if (f.exists()) { - // - // This is a temporary check especially for hadoop-1220. - // This will go away in the future. - // - DataNode.LOG.info("File " + f + " was deleted but still exists!"); - } + long dfsBytes = f.length() + metaFile.length(); + + // Delete the block asynchronously to make sure we can do it fast enough + asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString()); } if (error) { throw new IOException("Error in deleting blocks."); @@ -1730,6 +1734,10 @@ if (mbeanName != null) MBeanUtil.unregisterMBean(mbeanName); + if (asyncDiskService != null) { + asyncDiskService.shutdown(); + } + if(volumes != null) { for (FSVolume volume : volumes.volumes) { if(volume != null) { Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=834377&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (added) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Tue Nov 10 07:39:14 2009 @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * This class is a container of multiple thread pools, each for a volume, + * so that we can schedule async disk operations easily. + * + * Examples of async disk operations are deletion of block files in FSDataset. + * We don't want to create a new thread for each of the deletion request, and + * we don't want to do all deletions in the heartbeat thread since deletion + * can be slow, and we don't want to use a single thread pool because that + * is inefficient when we have more than 1 volume. AsyncDiskService is the + * solution for these. + * + * This class is used inside FSDataset. + * + * In the future, we should extract AsyncDiskService and put it into common. + * The FSDataset-specific logic should reside here. + */ +class FSDatasetAsyncDiskService { + + public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class); + + // ThreadPool core pool size + private static final int CORE_THREADS_PER_VOLUME = 1; + // ThreadPool maximum pool size + private static final int MAXIMUM_THREADS_PER_VOLUME = 4; + // ThreadPool keep-alive time for threads over core pool size + private static final long THREADS_KEEP_ALIVE_SECONDS = 60; + + private final ThreadGroup threadGroup = new ThreadGroup("async disk service"); + + private ThreadFactory threadFactory; + + private HashMap executors + = new HashMap(); + + /** + * Create a AsyncDiskServices with a set of volumes (specified by their + * root directories). + * + * The AsyncDiskServices uses one ThreadPool per volume to do the async + * disk operations. + * + * @param volumes The roots of the data volumes. + */ + FSDatasetAsyncDiskService(File[] volumes) { + + threadFactory = new ThreadFactory() { + public Thread newThread(Runnable r) { + return new Thread(threadGroup, r); + } + }; + + // Create one ThreadPool per volume + for (int v = 0 ; v < volumes.length; v++) { + ThreadPoolExecutor executor = new ThreadPoolExecutor( + CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue(), threadFactory); + + // This can reduce the number of running threads + executor.allowCoreThreadTimeOut(true); + executors.put(volumes[v], executor); + } + + } + + /** + * Execute the task sometime in the future, using ThreadPools. + */ + synchronized void execute(File root, Runnable task) { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); + } + ThreadPoolExecutor executor = executors.get(root); + if (executor == null) { + throw new RuntimeException("Cannot find root " + root + + " for execution of task " + task); + } else { + executor.execute(task); + } + } + + /** + * Gracefully shut down all ThreadPool. Will wait for all deletion + * tasks to finish. + */ + synchronized void shutdown() { + + if (executors == null) { + + LOG.warn("AsyncDiskService has already shut down."); + + } else { + LOG.info("Shutting down all async disk service threads..."); + + for (Map.Entry e + : executors.entrySet()) { + e.getValue().shutdown(); + } + // clear the executor map so that calling execute again will fail. + executors = null; + + LOG.info("All async disk service threads have been shut down."); + } + } + + /** + * Delete the block file and meta file from the disk asynchronously, adjust + * dfsUsed statistics accordingly. + */ + void deleteAsync(FSDataset.FSVolume volume, File blockFile, + File metaFile, long dfsBytes, String blockName) { + DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile + + " for deletion"); + ReplicaFileDeleteTask deletionTask = + new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes, + blockName); + execute(volume.getCurrentDir(), deletionTask); + } + + /** A task for deleting a block file and its associated meta file, as well + * as decrement the dfs usage of the volume. + */ + static class ReplicaFileDeleteTask implements Runnable { + + FSDataset.FSVolume volume; + File blockFile; + File metaFile; + long dfsBytes; + String blockName; + + ReplicaFileDeleteTask(FSDataset.FSVolume volume, File blockFile, + File metaFile, long dfsBytes, String blockName) { + this.volume = volume; + this.blockFile = blockFile; + this.metaFile = metaFile; + this.dfsBytes = dfsBytes; + this.blockName = blockName; + } + + FSDataset.FSVolume getVolume() { + return volume; + } + + @Override + public String toString() { + // Called in AsyncDiskService.execute for displaying error messages. + return "deletion of block " + blockName + " with block file " + blockFile + + " and meta file " + metaFile + " from volume " + volume; + } + + @Override + public void run() { + if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) { + DataNode.LOG.warn("Unexpected error trying to delete block " + + blockName + " at file " + blockFile + ". Ignored."); + } else { + volume.decDfsUsed(dfsBytes); + DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile); + } + } + }; + + +} Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java?rev=834377&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java Tue Nov 10 07:39:14 2009 @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; + +public class TestDFSRemove extends junit.framework.TestCase { + static int countLease(MiniDFSCluster cluster) { + return cluster.getNamesystem().leaseManager.countLease(); + } + + final Path dir = new Path("/test/remove/"); + + void list(FileSystem fs, String name) throws IOException { + FileSystem.LOG.info("\n\n" + name); + for(FileStatus s : fs.listStatus(dir)) { + FileSystem.LOG.info("" + s.getPath()); + } + } + + static void createFile(FileSystem fs, Path f) throws IOException { + DataOutputStream a_out = fs.create(f); + a_out.writeBytes("something"); + a_out.close(); + } + + static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException { + long total = 0; + for(DataNode node : cluster.getDataNodes()) { + total += node.getFSDataset().getDfsUsed(); + } + return total; + } + + public void testRemove() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + try { + FileSystem fs = cluster.getFileSystem(); + assertTrue(fs.mkdirs(dir)); + + long dfsUsedStart = getTotalDfsUsed(cluster); + { + // Create 100 files + final int fileCount = 100; + for (int i = 0; i < fileCount; i++) { + Path a = new Path(dir, "a" + i); + createFile(fs, a); + } + long dfsUsedMax = getTotalDfsUsed(cluster); + // Remove 100 files + for (int i = 0; i < fileCount; i++) { + Path a = new Path(dir, "a" + i); + fs.delete(a, false); + } + // wait 3 heartbeat intervals, so that all blocks are deleted. + Thread.sleep(3 * FSConstants.HEARTBEAT_INTERVAL * 1000); + // all blocks should be gone now. + long dfsUsedFinal = getTotalDfsUsed(cluster); + assertEquals("All blocks should be gone. start=" + dfsUsedStart + + " max=" + dfsUsedMax + " final=" + dfsUsedFinal, dfsUsedStart, dfsUsedFinal); + } + + fs.delete(dir, true); + } finally { + if (cluster != null) {cluster.shutdown();} + } + } +}