hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r582029 - in /lucene/hadoop/branches/branch-0.14: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Thu, 04 Oct 2007 23:59:02 GMT
Author: dhruba
Date: Thu Oct  4 16:59:00 2007
New Revision: 582029

URL: http://svn.apache.org/viewvc?rev=582029&view=rev
Log:
HADOOP-1955.  The Namenode tries to not pick the same source Datanode for
a replication request if the earlier replication request for the same
block and that source Datanode had failed.
(Raghu Angadi via dhruba)
This corresponds to changelist 582028 on trunk.


Modified:
    lucene/hadoop/branches/branch-0.14/CHANGES.txt
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
    lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java

Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Thu Oct  4 16:59:00 2007
@@ -26,6 +26,11 @@
     HADOOP-1978.  Name-node removes edits.new after a successful startup.
     (Konstantin Shvachko via dhruba)
 
+    HADOOP-1955.  The Namenode tries to not pick the same source Datanode for
+    a replication request if the earlier replication request for the same
+    block and that source Datanode had failed.
+    (Raghu Angadi via dhruba)
+
 Release 0.14.1 - 2007-09-04
 
   BUG FIXES

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java Thu Oct
 4 16:59:00 2007
@@ -442,7 +442,9 @@
   private HashMap<Block,FSVolume> volumeMap = null;
   private HashMap<Block,File> blockMap = null;
   static  Random random = new Random();
-
+  
+  long blockWriteTimeout = 3600 * 1000;
+  
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -457,6 +459,8 @@
     volumes.getVolumeMap(volumeMap);
     blockMap = new HashMap<Block,File>();
     volumes.getBlockMap(blockMap);
+    blockWriteTimeout = Math.max(
+         conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000;
   }
 
   /**
@@ -526,8 +530,9 @@
       //
       if (ongoingCreates.containsKey(b)) {
         // check how old is the temp file - wait 1 hour
-        File tmp = (File)ongoingCreates.get(b);
-        if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) {
+        File tmp = ongoingCreates.get(b);
+        if ((System.currentTimeMillis() - tmp.lastModified()) < 
+            blockWriteTimeout) {
           throw new IOException("Block " + b +
                                 " has already been started (though not completed), and thus
cannot be created.");
         } else {

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu
Oct  4 16:59:00 2007
@@ -183,7 +183,7 @@
   private long replicationRecheckInterval;
   //decommissionRecheckInterval is how often namenode checks if a node has finished decommission
   private long decommissionRecheckInterval;
-  static int replIndex = 0; // last datanode used for replication work
+  private int replIndex = 0; // last datanode used for replication work
   static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
 
   public static FSNamesystem fsNamesystemObject;
@@ -217,7 +217,9 @@
     this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
     this.safeMode = new SafeModeInfo(conf);
     setBlockTotal();
-    pendingReplications = new PendingReplicationBlocks(LOG);
+    pendingReplications = new PendingReplicationBlocks(LOG,
+                          conf.getInt("dfs.replication.pending.timeout.sec",
+                                      -1) * 1000);
     this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(new LeaseMonitor());
     this.replthread = new Daemon(new ReplicationMonitor());
@@ -1886,6 +1888,7 @@
     int numiter = 0;
     int foundwork = 0;
     int hsize = 0;
+    int lastReplIndex = -1;
 
     while (true) {
       DatanodeDescriptor node = null;
@@ -1897,6 +1900,11 @@
       synchronized (heartbeats) {
         hsize = heartbeats.size();
         if (numiter++ >= hsize) {
+          // no change in replIndex.
+          if (lastReplIndex >= 0) {
+            //next time, start after where the last replication was scheduled
+            replIndex = lastReplIndex;
+          }
           break;
         }
         if (replIndex >= hsize) {
@@ -1922,6 +1930,7 @@
           doReplication = true;
           addBlocksToBeReplicated(node, (Block[])replsets[0], 
                                   (DatanodeDescriptor[][])replsets[1]);
+          lastReplIndex = replIndex;
         }
       }
       if (!doReplication) {

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
(original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
Thu Oct  4 16:59:00 2007
@@ -49,12 +49,14 @@
   private long defaultRecheckInterval = 5 * 60 * 1000;
 
   PendingReplicationBlocks(long timeoutPeriod) {
-    this.timeout = timeoutPeriod;
-    init();
+    this(null, timeoutPeriod);
   }
 
-  PendingReplicationBlocks(Log log) {
+  PendingReplicationBlocks(Log log, long timeoutPeriod) {
     this.LOG = log;
+    if ( timeoutPeriod > 0 ) {
+      this.timeout = timeoutPeriod;
+    }
     init();
   }
 

Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
(original)
+++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
Thu Oct  4 16:59:00 2007
@@ -113,7 +113,9 @@
       conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
                new File(base_dir, "name2").getPath());
     }
-    conf.setInt("dfs.replication", Math.min(3, numDataNodes));
+    
+    int replication = conf.getInt("dfs.replication", 3);
+    conf.setInt("dfs.replication", Math.min(replication, numDataNodes));
     conf.setInt("dfs.safemode.extension", 0);
     conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second
     

Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java
(original)
+++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java
Thu Oct  4 16:59:00 2007
@@ -19,6 +19,7 @@
 
 import junit.framework.TestCase;
 import java.io.*;
+import java.util.Iterator;
 import java.util.Random;
 import java.net.*;
 
@@ -166,4 +167,152 @@
       cluster.shutdown();
     }
   }
+  
+  // Waits for all of the blocks to have expected replication
+  private void waitForBlockReplication(String filename, 
+                                       ClientProtocol namenode,
+                                       int expected, long maxWaitSec) 
+                                       throws IOException {
+    long start = System.currentTimeMillis();
+    
+    //wait for all the blocks to be replicated;
+    System.out.println("Checking for block replication for " + filename);
+    int iters = 0;
+    while (true) {
+      boolean replOk = true;
+      LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, 
+                                                        Long.MAX_VALUE);
+      
+      for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
+           iter.hasNext();) {
+        LocatedBlock block = iter.next();
+        int actual = block.getLocations().length;
+        if ( actual < expected ) {
+          if (true || iters > 0) {
+            System.out.println("Not enough replicas for " + block.getBlock() +
+                               " yet. Expecting " + expected + ", got " + 
+                               actual + ".");
+          }
+          replOk = false;
+          break;
+        }
+      }
+      
+      if (replOk) {
+        return;
+      }
+      
+      iters++;
+      
+      if (maxWaitSec > 0 && 
+          (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
+        throw new IOException("Timedout while waiting for all blocks to " +
+                              " be replicated for " + filename);
+      }
+      
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ignored) {}
+    }
+  }
+  
+  /* This test makes sure that NameNode retries all the available blocks 
+   * for under replicated blocks. 
+   * 
+   * It creates a file with one block and replication of 4. It corrupts 
+   * two of the blocks and removes one of the replicas. Expected behaviour is
+   * that missing replica will be copied from one valid source.
+   */
+  public void testPendingReplicationRetry() throws IOException {
+    
+    MiniDFSCluster cluster = null;
+    int numDataNodes = 4;
+    String testFile = "/replication-test-file";
+    Path testPath = new Path(testFile);
+    
+    byte buffer[] = new byte[1024];
+    for (int i=0; i<buffer.length; i++) {
+      buffer[i] = '1';
+    }
+    
+    try {
+      Configuration conf = new Configuration();
+      conf.set("dfs.replication", Integer.toString(numDataNodes));
+      //first time format
+      cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
+                                   true, null, null);
+      cluster.waitActive();
+      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                                            cluster.getNameNodePort()),
+                                            conf);
+      
+      OutputStream out = cluster.getFileSystem().create(testPath);
+      out.write(buffer);
+      out.close();
+      
+      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
+
+      // get first block of the file.
+      String block = dfsClient.namenode.
+                       getBlockLocations(testFile, 0, Long.MAX_VALUE).
+                       get(0).getBlock().toString();
+      
+      cluster.shutdown();
+      cluster = null;
+      
+      //Now mess up some of the replicas.
+      //Delete the first and corrupt the next two.
+      File baseDir = new File(System.getProperty("test.build.data"), 
+                                                 "dfs/data");
+      for (int i=0; i<25; i++) {
+        buffer[i] = '0';
+      }
+      
+      int fileCount = 0;
+      for (int i=0; i<6; i++) {
+        File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
+        System.out.println("Checking for file " + blockFile);
+        
+        if (blockFile.exists()) {
+          if (fileCount == 0) {
+            assertTrue(blockFile.delete());
+          } else {
+            // corrupt it.
+            long len = blockFile.length();
+            assertTrue(len > 50);
+            RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
+            blockOut.seek(len/3);
+            blockOut.write(buffer, 0, 25);
+          }
+          fileCount++;
+        }
+      }
+      assertEquals(3, fileCount);
+      
+      /* Start the MiniDFSCluster with more datanodes since once a writeBlock
+       * to a datanode node fails, same block can not be written to it
+       * immediately. In our case some replication attempts will fail.
+       */
+      conf = new Configuration();
+      conf.set("dfs.replication", Integer.toString(numDataNodes));
+      conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));
+      conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
+      conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist
+      
+      cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
+                                   true, null, null);
+      cluster.waitActive();
+      
+      dfsClient = new DFSClient(new InetSocketAddress("localhost",
+                                  cluster.getNameNodePort()),
+                                  conf);
+      
+      waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
+      
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }  
 }



Mime
View raw message