hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r720552 - in /hadoop/core/trunk: ./ conf/ src/core/org/apache/hadoop/util/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/util/
Date Tue, 25 Nov 2008 17:59:48 GMT
Author: szetszwo
Date: Tue Nov 25 09:59:48 2008
New Revision: 720552

URL: http://svn.apache.org/viewvc?rev=720552&view=rev
Log:
HADOOP-4061. Throttle Datanode decommission monitoring in Namenode. (szetszwo)

Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/util/CyclicIteration.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestCyclicIteration.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDecommission.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720552&r1=720551&r2=720552&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Nov 25 09:59:48 2008
@@ -1237,6 +1237,9 @@
     HADOOP-4616. Fuse-dfs can handle bad values from FileSystem.read call.
     (Pete Wyckoff via dhruba)
 
+    HADOOP-4061. Throttle Datanode decommission monitoring in Namenode.
+    (szetszwo)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=720552&r1=720551&r2=720552&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Tue Nov 25 09:59:48 2008
@@ -575,12 +575,19 @@
 
 <property>
   <name>dfs.namenode.decommission.interval</name>
-  <value>300</value>
+  <value>30</value>
   <description>Namenode periodicity in seconds to check if decommission is 
   complete.</description>
 </property>
 
 <property>
+  <name>dfs.namenode.decommission.nodes.per.interval</name>
+  <value>5</value>
+  <description>The number of nodes namenode checks if decommission is complete
+  in each dfs.namenode.decommission.interval.</description>
+</property>
+
+<property>
   <name>dfs.replication.interval</name>
   <value>3</value>
   <description>The periodicity in seconds with which the namenode computes 

Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/CyclicIteration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/CyclicIteration.java?rev=720552&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/CyclicIteration.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/CyclicIteration.java Tue Nov 25 09:59:48
2008
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+
+/** Provide an cyclic {@link Iterator} for a {@link NavigableMap}.
+ * The {@link Iterator} navigates the entries of the map
+ * according to the map's ordering.
+ * If the {@link Iterator} hits the last entry of the map,
+ * it will then continue from the first entry.
+ */
+public class CyclicIteration<K, V> implements Iterable<Map.Entry<K, V>>
{
+  private final NavigableMap<K, V> navigablemap;
+  private final NavigableMap<K, V> tailmap;
+
+  /** Construct an {@link Iterable} object,
+   * so that an {@link Iterator} can be created  
+   * for iterating the given {@link NavigableMap}.
+   * The iteration begins from the starting key exclusively.
+   */
+  public CyclicIteration(NavigableMap<K, V> navigablemap, K startingkey) {
+    if (navigablemap == null || navigablemap.isEmpty()) {
+      this.navigablemap = null;
+      this.tailmap = null;
+    }
+    else {
+      this.navigablemap = navigablemap;
+      this.tailmap = navigablemap.tailMap(startingkey, false); 
+    }
+  }
+
+  /** {@inheritDoc} */
+  public Iterator<Map.Entry<K, V>> iterator() {
+    return new CyclicIterator();
+  }
+
+  /** An {@link Iterator} for {@link CyclicIteration}. */
+  private class CyclicIterator implements Iterator<Map.Entry<K, V>> {
+    private boolean hasnext;
+    private Iterator<Map.Entry<K, V>> i;
+    /** The first entry to begin. */
+    private final Map.Entry<K, V> first;
+    /** The next entry. */
+    private Map.Entry<K, V> next;
+    
+    private CyclicIterator() {
+      hasnext = navigablemap != null;
+      if (hasnext) {
+        i = tailmap.entrySet().iterator();
+        first = nextEntry();
+        next = first;
+      }
+      else {
+        i = null;
+        first = null;
+        next = null;
+      }
+    }
+
+    private Map.Entry<K, V> nextEntry() {
+      if (!i.hasNext()) {
+        i = navigablemap.entrySet().iterator();
+      }
+      return i.next();
+    }
+
+    /** {@inheritDoc} */
+    public boolean hasNext() {
+      return hasnext;
+    }
+
+    /** {@inheritDoc} */
+    public Map.Entry<K, V> next() {
+      if (!hasnext) {
+        throw new NoSuchElementException();
+      }
+
+      final Map.Entry<K, V> curr = next;
+      next = nextEntry();
+      hasnext = !next.equals(first);
+      return curr;
+    }
+
+    /** Not supported */
+    public void remove() {
+      throw new UnsupportedOperationException("Not supported");
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java?rev=720552&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
(added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java
Tue Nov 25 09:59:48 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.CyclicIteration;
+
+/**
+ * Manage node decommissioning.
+ */
+class DecommissionManager {
+  static final Log LOG = LogFactory.getLog(DecommissionManager.class);
+
+  private final FSNamesystem fsnamesystem;
+
+  DecommissionManager(FSNamesystem namesystem) {
+    this.fsnamesystem = namesystem;
+  }
+
+  /** Periodically check decommission status. */
+  class Monitor implements Runnable {
+    /** recheckInterval is how often namenode checks
+     *  if a node has finished decommission
+     */
+    private final long recheckInterval;
+    /** The number of decommission nodes to check for each interval */
+    private final int numNodesPerCheck;
+    /** firstkey can be initialized to anything. */
+    private String firstkey = "";
+
+    Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
+      this.recheckInterval = recheckIntervalInSecond * 1000L;
+      this.numNodesPerCheck = numNodesPerCheck;
+    }
+
+    /**
+     * Check decommission status of numNodesPerCheck nodes
+     * for every recheckInterval milliseconds.
+     */
+    public void run() {
+      for(; fsnamesystem.isRunning(); ) {
+        synchronized(fsnamesystem) {
+          check();
+        }
+  
+        try {
+          Thread.sleep(recheckInterval);
+        } catch (InterruptedException ie) {
+          LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
+        }
+      }
+    }
+    
+    private void check() {
+      int count = 0;
+      for(Map.Entry<String, DatanodeDescriptor> entry
+          : new CyclicIteration<String, DatanodeDescriptor>(
+              fsnamesystem.datanodeMap, firstkey)) {
+        final DatanodeDescriptor d = entry.getValue();
+        firstkey = entry.getKey();
+
+        if (d.isDecommissionInProgress()) {
+          try {
+            fsnamesystem.checkDecommissionStateInternal(d);
+          } catch(Exception e) {
+            LOG.warn("entry=" + entry, e);
+          }
+          if (++count == numNodesPerCheck) {
+            return;
+          }
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=720552&r1=720551&r2=720552&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue
Nov 25 09:59:48 2008
@@ -164,7 +164,7 @@
    * <p>
    * Mapping: StorageID -> DatanodeDescriptor
    */
-  Map<String, DatanodeDescriptor> datanodeMap = 
+  NavigableMap<String, DatanodeDescriptor> datanodeMap = 
     new TreeMap<String, DatanodeDescriptor>();
 
   //
@@ -215,7 +215,7 @@
   Daemon smmthread = null;  // SafeModeMonitor thread
   public Daemon replthread = null;  // Replication thread
   
-  volatile boolean fsRunning = true;
+  private volatile boolean fsRunning = true;
   long systemStart = 0;
 
   //  The maximum number of replicates we should allow for a single block
@@ -233,8 +233,6 @@
   private long heartbeatExpireInterval;
   //replicationRecheckInterval is how often namenode checks for new replication work
   private long replicationRecheckInterval;
-  //decommissionRecheckInterval is how often namenode checks if a node has finished decommission
-  private long decommissionRecheckInterval;
   // default block size of a file
   private long defaultBlockSize = 0;
   // allow appending to hdfs files
@@ -318,7 +316,9 @@
 
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                                            conf.get("dfs.hosts.exclude",""));
-    this.dnthread = new Daemon(new DecommissionedMonitor());
+    this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
+        conf.getInt("dfs.namenode.decommission.interval", 30),
+        conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
     dnthread.start();
 
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
@@ -419,8 +419,6 @@
       10 * heartbeatInterval;
     this.replicationRecheckInterval = 
       conf.getInt("dfs.replication.interval", 3) * 1000L;
-    this.decommissionRecheckInterval = 
-      conf.getInt("dfs.namenode.decommission.interval", 5 * 60) * 1000L;
     this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
     this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, 
@@ -481,6 +479,11 @@
     }
   }
 
+  /** Is this name system running? */
+  boolean isRunning() {
+    return fsRunning;
+  }
+
   /**
    * Dump all metadata into specified file
    */
@@ -3546,9 +3549,8 @@
    */
   private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     boolean status = false;
-    Iterator<Block> decommissionBlocks = srcNode.getBlockIterator();
-    while(decommissionBlocks.hasNext()) {
-      Block block = decommissionBlocks.next();
+    for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
+      final Block block = i.next();
       INode fileINode = blocksMap.getINode(block);
 
       if (fileINode != null) {
@@ -3579,9 +3581,9 @@
    * Change, if appropriate, the admin state of a datanode to 
    * decommission completed. Return true if decommission is complete.
    */
-  private boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+  boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
     //
-    // Check to see if all blocks in this decommisioned
+    // Check to see if all blocks in this decommissioned
     // node has reached their target replication factor.
     //
     if (node.isDecommissionInProgress()) {
@@ -3701,39 +3703,6 @@
   private boolean shouldNodeShutdown(DatanodeDescriptor node) {
     return (node.isDecommissioned());
   }
-
-  /**
-   * Check if any of the nodes being decommissioned has finished 
-   * moving all its datablocks to another replica. This is a loose
-   * heuristic to determine when a decommission is really over.
-   */
-  public synchronized void decommissionedDatanodeCheck() {
-    for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-         it.hasNext();) {
-      DatanodeDescriptor node = it.next();  
-      checkDecommissionStateInternal(node);
-    }
-  }
-    
-  /**
-   * Periodically calls decommissionedDatanodeCheck().
-   */
-  class DecommissionedMonitor implements Runnable {
-        
-    public void run() {
-      while (fsRunning) {
-        try {
-          decommissionedDatanodeCheck();
-        } catch (Exception e) {
-          FSNamesystem.LOG.info(StringUtils.stringifyException(e));
-        }
-        try {
-          Thread.sleep(decommissionRecheckInterval);
-        } catch (InterruptedException ie) {
-        }
-      }
-    }
-  }
     
   /**
    * Get data node by storage ID.

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=720552&r1=720551&r2=720552&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Tue
Nov 25 09:59:48 2008
@@ -339,7 +339,7 @@
   class Monitor implements Runnable {
     public void run() {
       try {
-        while (fsnamesystem.fsRunning) {
+        while (fsnamesystem.isRunning()) {
           synchronized (fsnamesystem) {
             synchronized (LeaseManager.this) {
               Lease top;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDecommission.java?rev=720552&r1=720551&r2=720552&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDecommission.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDecommission.java Tue Nov 25 09:59:48
2008
@@ -17,23 +17,24 @@
  */
 package org.apache.hadoop.hdfs;
 
-import junit.framework.TestCase;
-import java.io.*;
-import java.util.Collection;
-import java.util.Random;
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
-import java.net.*;
-import java.lang.InterruptedException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 /**
  * This class tests the decommissioning of nodes.
@@ -160,10 +161,8 @@
   private String decommissionNode(NameNode namenode,
                                   Configuration conf,
                                   DFSClient client, 
-                                  FileSystem filesys,
                                   FileSystem localFileSys)
     throws IOException {
-    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
 
     //
@@ -189,18 +188,6 @@
   }
 
   /*
-   * put node back in action
-   */
-  private void commissionNode(FileSystem filesys, FileSystem localFileSys,
-                              String node) throws IOException {
-    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
-
-    System.out.println("Commissioning nodes.");
-    writeConfigFile(localFileSys, excludeFile, null);
-    dfs.refreshNodes();
-  }
-
-  /*
    * Check if node is in the requested state.
    */
   private boolean checkNodeState(FileSystem filesys, 
@@ -236,7 +223,6 @@
   private void waitNodeState(FileSystem filesys,
                              String node,
                              NodeState state) throws IOException {
-    DistributedFileSystem dfs = (DistributedFileSystem) filesys;
     boolean done = checkNodeState(filesys, node, state);
     while (!done) {
       System.out.println("Waiting for node " + node +
@@ -278,7 +264,6 @@
     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
     assertEquals("Number of Datanodes ", numDatanodes, info.length);
     FileSystem fileSys = cluster.getFileSystem();
-    DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
 
     try {
       for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
@@ -293,7 +278,7 @@
         checkFile(fileSys, file1, replicas);
         printFileLocations(fileSys, file1);
         String downnode = decommissionNode(cluster.getNameNode(), conf,
-                                           client, fileSys, localFileSys);
+                                           client, localFileSys);
         decommissionedNodes.add(downnode);
         waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED);
         checkFile(fileSys, file1, replicas, downnode);

Added: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestCyclicIteration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestCyclicIteration.java?rev=720552&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestCyclicIteration.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestCyclicIteration.java Tue Nov 25
09:59:48 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class TestCyclicIteration extends junit.framework.TestCase {
+  public void testCyclicIteration() throws Exception {
+    for(int n = 0; n < 5; n++) {
+      checkCyclicIteration(n);
+    }
+  }
+
+  private static void checkCyclicIteration(int numOfElements) {
+    //create a tree map
+    final NavigableMap<Integer, Integer> map = new TreeMap<Integer, Integer>();
+    final Integer[] integers = new Integer[numOfElements];
+    for(int i = 0; i < integers.length; i++) {
+      integers[i] = 2*i;
+      map.put(integers[i], integers[i]);
+    }
+    System.out.println("\n\nintegers=" + Arrays.asList(integers));
+    System.out.println("map=" + map);
+
+    //try starting everywhere
+    for(int start = -1; start <= 2*integers.length - 1; start++) {
+      //get a cyclic iteration
+      final List<Integer> iteration = new ArrayList<Integer>(); 
+      for(Map.Entry<Integer, Integer> e : new CyclicIteration<Integer, Integer>(map,
start)) {
+        iteration.add(e.getKey());
+      }
+      System.out.println("start=" + start + ", iteration=" + iteration);
+      
+      //verify results
+      for(int i = 0; i < integers.length; i++) {
+        final int j = ((start+2)/2 + i)%integers.length;
+        assertEquals("i=" + i + ", j=" + j, iteration.get(i), integers[j]);
+      }
+    }
+  }
+}



Mime
View raw message