hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r834377 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/
Date Tue, 10 Nov 2009 07:39:14 GMT
Author: dhruba
Date: Tue Nov 10 07:39:14 2009
New Revision: 834377

URL: http://svn.apache.org/viewvc?rev=834377&view=rev
Log:
HDFS-611. Prevent DataNode heartbeat times from increasing even when
the DataNode has many blocks to delete. (Zheng Shao via dhruba)


Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=834377&r1=834376&r2=834377&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 10 07:39:14 2009
@@ -46,6 +46,9 @@
 
     HDFS-757. Enable Unit test for HDFS Raid. (dhruba)
 
+    HDFS-611. Prevent DataNode heartbeat times from increasing even when
+    the DataNode has many blocks to delete. (Zheng Shao via dhruba)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=834377&r1=834376&r2=834377&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Tue Nov 10
07:39:14 2009
@@ -28,7 +28,7 @@
   public static int MIN_BLOCKS_FOR_WRITE = 5;
 
   // Chunk the block Invalidate message
-  public static final int BLOCK_INVALIDATE_CHUNK = 100;
+  public static final int BLOCK_INVALIDATE_CHUNK = 1000;
 
   // Long that indicates "leave current quota unchanged"
   public static final long QUOTA_DONT_SET = Long.MAX_VALUE;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=834377&r1=834376&r2=834377&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Nov
10 07:39:14 2009
@@ -294,6 +294,7 @@
   }
 
   class FSVolume {
+    private File currentDir;
     private FSDir dataDir;      // directory store Finalized replica
     private File rbwDir;        // directory store RBW replica
     private File tmpDir;        // directory store Temporary replica
@@ -304,6 +305,7 @@
     
     FSVolume(File currentDir, Configuration conf) throws IOException {
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+      this.currentDir = currentDir; 
       File parent = currentDir.getParentFile();
       final File finalizedDir = new File(
           currentDir, DataStorage.STORAGE_DIR_FINALIZED);
@@ -337,8 +339,16 @@
       this.dfsUsage.start();
     }
 
+    File getCurrentDir() {
+      return currentDir;
+    }
+    
     void decDfsUsed(long value) {
-      dfsUsage.decDfsUsed(value);
+      // The caller to this method (BlockFileDeleteTask.run()) does
+      // not have locked FSDataset.this yet.
+      synchronized(FSDataset.this) {
+        dfsUsage.decDfsUsed(value);
+      }
     }
     
     long getDfsUsed() throws IOException {
@@ -822,6 +832,7 @@
   private int maxBlocksPerDir = 0;
   ReplicasMap volumeMap = new ReplicasMap();
   static  Random random = new Random();
+  FSDatasetAsyncDiskService asyncDiskService;
 
   // Used for synchronizing access to usage stats
   private Object statsLock = new Object();
@@ -840,6 +851,11 @@
     }
     volumes = new FSVolumeSet(volArray);
     volumes.getVolumeMap(volumeMap);
+    File[] roots = new File[storage.getNumStorageDirs()];
+    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
+      roots[idx] = storage.getStorageDir(idx).getCurrentDir();
+    }
+    asyncDiskService = new FSDatasetAsyncDiskService(roots);
     registerMBean(storage.getStorageID());
   }
 
@@ -1603,22 +1619,10 @@
         volumeMap.remove(invalidBlks[i]);
       }
       File metaFile = getMetaFile( f, invalidBlks[i] );
-      long blockSize = f.length()+metaFile.length();
-      if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
-        DataNode.LOG.warn("Unexpected error trying to delete block "
-                          + invalidBlks[i] + " at file " + f);
-        error = true;
-        continue;
-      }
-      v.decDfsUsed(blockSize);
-      DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
-      if (f.exists()) {
-        //
-        // This is a temporary check especially for hadoop-1220. 
-        // This will go away in the future.
-        //
-        DataNode.LOG.info("File " + f + " was deleted but still exists!");
-      }
+      long dfsBytes = f.length() + metaFile.length();
+      
+      // Delete the block asynchronously to make sure we can do it fast enough
+      asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString());
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
@@ -1730,6 +1734,10 @@
     if (mbeanName != null)
       MBeanUtil.unregisterMBean(mbeanName);
     
+    if (asyncDiskService != null) {
+      asyncDiskService.shutdown();
+    }
+    
     if(volumes != null) {
       for (FSVolume volume : volumes.volumes) {
         if(volume != null) {

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=834377&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
(added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
Tue Nov 10 07:39:14 2009
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * This class is a container of multiple thread pools, each for a volume,
+ * so that we can schedule async disk operations easily.
+ * 
+ * Examples of async disk operations are deletion of block files in FSDataset.
+ * We don't want to create a new thread for each of the deletion request, and
+ * we don't want to do all deletions in the heartbeat thread since deletion
+ * can be slow, and we don't want to use a single thread pool because that
+ * is inefficient when we have more than 1 volume.  AsyncDiskService is the
+ * solution for these.
+ * 
+ * This class is used inside FSDataset.
+ * 
+ * In the future, we should extract AsyncDiskService and put it into common.
+ * The FSDataset-specific logic should reside here. 
+ */
+class FSDatasetAsyncDiskService {
+  
+  public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
+  
+  // ThreadPool core pool size
+  private static final int CORE_THREADS_PER_VOLUME = 1;
+  // ThreadPool maximum pool size
+  private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
+  // ThreadPool keep-alive time for threads over core pool size
+  private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
+  
+  private final ThreadGroup threadGroup = new ThreadGroup("async disk service");
+  
+  private ThreadFactory threadFactory;
+  
+  private HashMap<File, ThreadPoolExecutor> executors
+      = new HashMap<File, ThreadPoolExecutor>();
+  
+  /**
+   * Create a AsyncDiskServices with a set of volumes (specified by their
+   * root directories).
+   * 
+   * The AsyncDiskServices uses one ThreadPool per volume to do the async
+   * disk operations.
+   * 
+   * @param volumes The roots of the data volumes.
+   */
+  FSDatasetAsyncDiskService(File[] volumes) {
+    
+    threadFactory = new ThreadFactory() {
+      public Thread newThread(Runnable r) {
+        return new Thread(threadGroup, r);
+      }
+    };
+    
+    // Create one ThreadPool per volume
+    for (int v = 0 ; v < volumes.length; v++) {
+      ThreadPoolExecutor executor = new ThreadPoolExecutor(
+          CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, 
+          THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, 
+          new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+      // This can reduce the number of running threads
+      executor.allowCoreThreadTimeOut(true);
+      executors.put(volumes[v], executor);
+    }
+    
+  }
+  
+  /**
+   * Execute the task sometime in the future, using ThreadPools.
+   */
+  synchronized void execute(File root, Runnable task) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncDiskService is already shutdown");
+    }
+    ThreadPoolExecutor executor = executors.get(root);
+    if (executor == null) {
+      throw new RuntimeException("Cannot find root " + root
+          + " for execution of task " + task);
+    } else {
+      executor.execute(task);
+    }
+  }
+  
+  /**
+   * Gracefully shut down all ThreadPool. Will wait for all deletion
+   * tasks to finish.
+   */
+  synchronized void shutdown() {
+    
+    if (executors == null) {
+      
+      LOG.warn("AsyncDiskService has already shut down.");
+      
+    } else {
+      LOG.info("Shutting down all async disk service threads...");
+      
+      for (Map.Entry<File, ThreadPoolExecutor> e
+          : executors.entrySet()) {
+        e.getValue().shutdown();
+      }
+      // clear the executor map so that calling execute again will fail.
+      executors = null;
+      
+      LOG.info("All async disk service threads have been shut down.");
+    }
+  }
+
+  /**
+   * Delete the block file and meta file from the disk asynchronously, adjust
+   * dfsUsed statistics accordingly.
+   */
+  void deleteAsync(FSDataset.FSVolume volume, File blockFile,
+      File metaFile, long dfsBytes, String blockName) {
+    DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
+        + " for deletion");
+    ReplicaFileDeleteTask deletionTask = 
+        new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes,
+            blockName);
+    execute(volume.getCurrentDir(), deletionTask);
+  }
+  
+  /** A task for deleting a block file and its associated meta file, as well
+   *  as decrement the dfs usage of the volume. 
+   */
+  static class ReplicaFileDeleteTask implements Runnable {
+
+    FSDataset.FSVolume volume;
+    File blockFile;
+    File metaFile;
+    long dfsBytes;
+    String blockName;
+    
+    ReplicaFileDeleteTask(FSDataset.FSVolume volume, File blockFile,
+        File metaFile, long dfsBytes, String blockName) {
+      this.volume = volume;
+      this.blockFile = blockFile;
+      this.metaFile = metaFile;
+      this.dfsBytes = dfsBytes;
+      this.blockName = blockName;
+    }
+    
+    FSDataset.FSVolume getVolume() {
+      return volume;
+    }
+
+    @Override
+    public String toString() {
+      // Called in AsyncDiskService.execute for displaying error messages.
+      return "deletion of block " + blockName + " with block file " + blockFile
+          + " and meta file " + metaFile + " from volume " + volume;
+    }
+
+    @Override
+    public void run() {
+      if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
+        DataNode.LOG.warn("Unexpected error trying to delete block "
+            + blockName + " at file " + blockFile + ". Ignored.");
+      } else {
+        volume.decDfsUsed(dfsBytes);
+        DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile);
+      }
+    }
+  };
+  
+  
+}

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java?rev=834377&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRemove.java Tue Nov 10 07:39:14
2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+
+public class TestDFSRemove extends junit.framework.TestCase {
+  static int countLease(MiniDFSCluster cluster) {
+    return cluster.getNamesystem().leaseManager.countLease();
+  }
+  
+  final Path dir = new Path("/test/remove/");
+
+  void list(FileSystem fs, String name) throws IOException {
+    FileSystem.LOG.info("\n\n" + name);
+    for(FileStatus s : fs.listStatus(dir)) {
+      FileSystem.LOG.info("" + s.getPath());
+    }
+  }
+
+  static void createFile(FileSystem fs, Path f) throws IOException {
+    DataOutputStream a_out = fs.create(f);
+    a_out.writeBytes("something");
+    a_out.close();
+  }
+  
+  static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException {
+    long total = 0;
+    for(DataNode node : cluster.getDataNodes()) {
+      total += node.getFSDataset().getDfsUsed();
+    }
+    return total;
+  }
+  
+  public void testRemove() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      assertTrue(fs.mkdirs(dir));
+      
+      long dfsUsedStart = getTotalDfsUsed(cluster);
+      {
+        // Create 100 files
+        final int fileCount = 100;
+        for (int i = 0; i < fileCount; i++) {
+          Path a = new Path(dir, "a" + i);
+          createFile(fs, a);
+        }
+        long dfsUsedMax = getTotalDfsUsed(cluster);
+        // Remove 100 files
+        for (int i = 0; i < fileCount; i++) {
+          Path a = new Path(dir, "a" + i);
+          fs.delete(a, false);
+        }
+        // wait 3 heartbeat intervals, so that all blocks are deleted.
+        Thread.sleep(3 * FSConstants.HEARTBEAT_INTERVAL * 1000);
+        // all blocks should be gone now.
+        long dfsUsedFinal = getTotalDfsUsed(cluster);
+        assertEquals("All blocks should be gone. start=" + dfsUsedStart
+            + " max=" + dfsUsedMax + " final=" + dfsUsedFinal, dfsUsedStart, dfsUsedFinal);
+      }
+
+      fs.delete(dir, true);
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+}



Mime
View raw message