hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r512944 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/dfs/FSNamesystem.java src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java src/test/org/apache/hadoop/dfs/TestPendingReplication.java
Date Wed, 28 Feb 2007 20:17:05 GMT
Author: cutting
Date: Wed Feb 28 12:17:05 2007
New Revision: 512944

URL: http://svn.apache.org/viewvc?view=rev&rev=512944
Log:
HADOOP-940.  Improve HDFS's replication scheduling.  Contributed by Dhruba.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=512944&r1=512943&r2=512944
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 28 12:17:05 2007
@@ -162,6 +162,9 @@
 48. HADOOP-1043.  Optimize shuffle, increasing parallelism.
     (Devaraj Das via cutting)
 
+49. HADOOP-940.  Improve HDFS's replication scheduling.
+    (Dhruba Borthakur via cutting) 
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=512944&r1=512943&r2=512944
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb 28 12:17:05
2007
@@ -156,7 +156,7 @@
     // Set of: Block
     //
     private UnderReplicationBlocks neededReplications = new UnderReplicationBlocks();
-    private Collection<Block> pendingReplications = new TreeSet<Block>();
+    private PendingReplicationBlocks pendingReplications;
 
     //
     // Used for handling lock-leases
@@ -248,6 +248,7 @@
         this.dir.loadFSImage( conf );
         this.safeMode = new SafeModeInfo( conf );
         setBlockTotal();
+        pendingReplications = new PendingReplicationBlocks(LOG);
         this.hbthread = new Daemon(new HeartbeatMonitor());
         this.lmthread = new Daemon(new LeaseMonitor());
         this.replthread = new Daemon(new ReplicationMonitor());
@@ -298,6 +299,7 @@
         fsRunning = false;
       }
         try {
+            pendingReplications.stop();
             infoServer.stop();
             hbthread.join(3000);
             replthread.join(3000);
@@ -1706,6 +1708,7 @@
         while (fsRunning) {
           try {
             computeDatanodeWork();
+            processPendingReplications();
             Thread.sleep(replicationRecheckInterval);
           } catch (InterruptedException ie) {
           } catch (IOException ie) {
@@ -1793,6 +1796,21 @@
     }
 
     /**
+     * If there were any replication requests that timed out, reap them
+     * and put them back into the neededReplication queue
+     */
+    void processPendingReplications() {
+      Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
+      if (timedOutItems != null) {
+        synchronized (this) {
+          for (int i = 0; i < timedOutItems.length; i++) {
+            neededReplications.add(timedOutItems[i]);
+          }
+        }
+      }
+    }
+
+    /**
      * Add more replication work for this datanode.
      */
     synchronized void addBlocksToBeReplicated(DatanodeDescriptor node, 
@@ -2094,9 +2112,6 @@
         if(neededReplications.contains(block)) {
             neededReplications.update(block, curReplicaDelta, 0);
         }
-        if (numCurrentReplica >= fileReplication ) {
-            pendingReplications.remove(block);
-        }        
         proccessOverReplicatedBlock( block, fileReplication );
         return block;
     }
@@ -2256,6 +2271,7 @@
         // Modify the blocks->datanode map and node's map.
         // 
         node.addBlock( addStoredBlock(block, node) );
+        pendingReplications.remove(block);
     }
 
     /**
@@ -2606,7 +2622,8 @@
               // filter out containingNodes that are marked for decommission.
               List<DatanodeDescriptor> nodes = 
                   filterDecommissionedNodes(containingNodes);
-              int numCurrentReplica = nodes.size();
+              int numCurrentReplica = nodes.size() +
+                                      pendingReplications.getNumReplicas(block);
               DatanodeDescriptor targets[] = replicator.chooseTarget(
                   Math.min( fileINode.getReplication() - numCurrentReplica,
                             needed),
@@ -2640,7 +2657,7 @@
             if (numCurrentReplica + targets.length >= numExpectedReplica) {
               neededReplications.remove(
                       block, numCurrentReplica, numExpectedReplica);
-              pendingReplications.add(block);
+              pendingReplications.add(block, targets.length);
               NameNode.stateChangeLog.debug(
                 "BLOCK* NameSystem.pendingTransfer: "
                 + block.getBlockName()

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?view=auto&rev=512944
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Wed Feb
28 12:17:05 2007
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.util.*;
+import java.util.*;
+
+/***************************************************
+ * PendingReplicationBlocks does the bookkeeping of all
+ * blocks that are getting replicated.
+ *
+ * It does the following:
+ * 1)  record blocks that are getting replicated at this instant.
+ * 2)  a coarse grain timer to track age of replication request
+ * 3)  a thread that periodically identifies replication-requests
+ *     that never made it.
+ *
+ * @author Dhruba Borthakur
+ ***************************************************/
+class PendingReplicationBlocks {
+  private Log LOG = null;
+  private Map<Block, PendingBlockInfo> pendingReplications;
+  private ArrayList<Block> timedOutItems;
+  Daemon timerThread = null;
+  private boolean fsRunning = true;
+
+  //
+  // It might take anywhere between 5 to 10 minutes before
+  // a request is timed out.
+  //
+  private long timeout = 5 * 60 * 1000;
+  private long defaultRecheckInterval = 5 * 60 * 1000;
+
+  PendingReplicationBlocks(long timeoutPeriod) {
+    this.timeout = timeoutPeriod;
+    init();
+  }
+
+  PendingReplicationBlocks(Log log) {
+    this.LOG = log;
+    init();
+  }
+
+  void init() {
+    pendingReplications = new HashMap<Block, PendingBlockInfo>();
+    timedOutItems = new ArrayList<Block>();
+    this.timerThread = new Daemon(new PendingReplicationMonitor());
+    timerThread.start();
+  }
+
+  /**
+   * Add a block to the list of pending Replications
+   */
+  void add(Block block, int numReplicas) {
+    synchronized (pendingReplications) {
+      PendingBlockInfo found = pendingReplications.get(block);
+      if (found == null) {
+        pendingReplications.put(block, new PendingBlockInfo(numReplicas));
+      } else {
+        found.incrementReplicas(numReplicas);
+        found.setTimeStamp();
+      }
+    }
+  }
+
+  /**
+   * One replication request for this block has finished.
+   * Decrement the number of pending replication requests
+   * for this block.
+   */
+  void remove(Block block) {
+    synchronized (pendingReplications) {
+      PendingBlockInfo found = pendingReplications.get(block);
+      if (found != null) {
+        found.decrementReplicas();
+        if (found.getNumReplicas() <= 0) {
+          pendingReplications.remove(block);
+        }
+      }
+    }
+  }
+
+  /**
+   * The total number of blocks that are undergoing replication
+   */
+  long size() {
+    return pendingReplications.size();
+  } 
+
+  /**
+   * How many copies of this block is pending replication?
+   */
+  int getNumReplicas(Block block) {
+    synchronized (pendingReplications) {
+      PendingBlockInfo found = pendingReplications.get(block);
+      if (found != null) {
+        return found.getNumReplicas();
+      }
+    }
+    return 0;
+  }
+
+  /**
+   * Returns a list of blocks that have timed out their 
+   * replication requests. Returns null if no blocks have
+   * timed out.
+   */
+  Block[] getTimedOutBlocks() {
+    synchronized (timedOutItems) {
+      if (timedOutItems.size() <= 0) {
+        return null;
+      }
+      Block[] blockList = timedOutItems.toArray(
+                            new Block[timedOutItems.size()]);
+      timedOutItems.clear();
+      return blockList;
+    }
+  }
+
+  /**
+   * An object that contains information about a block that 
+   * is being replicated. It records the timestamp when the 
+   * system started replicating the most recent copy of this
+   * block. It also records the number of replication
+   * requests that are in progress.
+   */
+  class PendingBlockInfo {
+    private long timeStamp;
+    private int numReplicasInProgress;
+
+    PendingBlockInfo(int numReplicas) {
+      this.timeStamp = FSNamesystem.now();
+      this.numReplicasInProgress = numReplicas;
+    }
+
+    long getTimeStamp() {
+      return timeStamp;
+    }
+
+    void setTimeStamp() {
+      timeStamp = FSNamesystem.now();
+    }
+
+    void incrementReplicas(int increment) {
+      numReplicasInProgress += increment;
+    }
+
+    void decrementReplicas() {
+      numReplicasInProgress--;
+      assert(numReplicasInProgress >= 0);
+    }
+
+    int getNumReplicas() {
+      return numReplicasInProgress;
+    }
+  }
+
+  /*
+   * A periodic thread that scans for blocks that never finished
+   * their replication request.
+   */
+  class PendingReplicationMonitor implements Runnable {
+    public void run() {
+      while (fsRunning) {
+        long period = Math.min(defaultRecheckInterval, timeout);
+        try {
+          pendingReplicationCheck();
+          Thread.sleep(period);
+        } catch (InterruptedException ie) {
+          if (LOG != null) {
+            LOG.warn("PendingReplicationMonitor thread received exception. " 
+                     + ie);
+          }
+        }
+      }
+    }
+
+    /**
+     * Iterate through all items and detect timed-out items
+     */
+    void pendingReplicationCheck() {
+      synchronized (pendingReplications) {
+        Iterator iter = pendingReplications.entrySet().iterator();
+        long now = FSNamesystem.now();
+        while (iter.hasNext()) {
+          Map.Entry entry = (Map.Entry) iter.next();
+          PendingBlockInfo pendingBlock = (PendingBlockInfo) entry.getValue();
+          if (now > pendingBlock.getTimeStamp() + timeout) {
+            Block block = (Block) entry.getKey();
+            synchronized (timedOutItems) {
+              timedOutItems.add(block);
+            }
+            if (LOG != null) {
+              LOG.warn("PendingReplicationMonitor timed out block " + block);
+            }
+            iter.remove();
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   * Shuts down the pending replication monitor thread.
+   * Waits for the thread to exit.
+   */
+  void stop() {
+    fsRunning = false;
+    timerThread.interrupt();
+    try {
+      timerThread.join(3000);
+    } catch (InterruptedException ie) {
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java?view=auto&rev=512944
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java Wed Feb
28 12:17:05 2007
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.lang.System;
+
+/**
+ * This class tests the internals of PendingReplicationBlocks.java
+ * @author Dhruba Borthakur
+ */
+public class TestPendingReplication extends TestCase {
+  public void testPendingReplication() {
+    int timeout = 10;		// 10 seconds
+    PendingReplicationBlocks pendingReplications;
+    pendingReplications = new PendingReplicationBlocks(timeout * 1000);
+
+    //
+    // Add 10 blocks to pendingReplciations.
+    //
+    for (int i = 0; i < 10; i++) {
+      Block block = new Block(i, i);
+      pendingReplications.add(block, i);
+    }
+    assertEquals("Size of pendingReplications ",
+                 10, pendingReplications.size());
+
+
+    //
+    // remove one item and reinsert it
+    //
+    Block blk = new Block(8, 8);
+    pendingReplications.remove(blk);             // removes one replica
+    assertEquals("pendingReplications.getNumReplicas ",
+                 7, pendingReplications.getNumReplicas(blk));
+
+    for (int i = 0; i < 7; i++) {
+      pendingReplications.remove(blk);           // removes all replicas
+    }
+    assertTrue(pendingReplications.size() == 9);
+    pendingReplications.add(blk, 8);
+    assertTrue(pendingReplications.size() == 10);
+
+    //
+    // verify that the number of replicas returned
+    // are sane.
+    //
+    for (int i = 0; i < 10; i++) {
+      Block block = new Block(i, i);
+      int numReplicas = pendingReplications.getNumReplicas(block);
+      assertTrue(numReplicas == i);
+    }
+
+    //
+    // verify that nothing has timed out so far
+    //
+    assertTrue(pendingReplications.getTimedOutBlocks() == null);
+
+    //
+    // Wait for one second and then insert some more items.
+    //
+    try {
+      Thread.sleep(1000);
+    } catch (Exception e) {
+    }
+
+    for (int i = 10; i < 15; i++) {
+      Block block = new Block(i, i);
+      pendingReplications.add(block, i);
+    }
+    assertTrue(pendingReplications.size() == 15);
+
+    //
+    // Wait for everything to timeout.
+    //
+    int loop = 0;
+    while (pendingReplications.size() > 0) {
+      try {
+        Thread.sleep(1000);
+      } catch (Exception e) {
+      }
+      loop++;
+    }
+    System.out.println("Had to wait for " + loop +
+                      " seconds for the lot to timeout");
+
+    //
+    // Verify that everything has timed out.
+    //
+    assertEquals("Size of pendingReplications ",
+                 0, pendingReplications.size());
+    Block[] timedOut = pendingReplications.getTimedOutBlocks();
+    assertTrue(timedOut != null && timedOut.length == 15);
+    for (int i = 0; i < timedOut.length; i++) {
+      assertTrue(timedOut[i].getBlockId() < 15);
+    }
+  }
+}



Mime
View raw message