hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r752949 [1/3] - in /hadoop/core: branches/HADOOP-3628/src/test/org/apache/hadoop/cli/ branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/ trunk/conf/ trunk/ivy/ trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ trunk/src/c...
Date Thu, 12 Mar 2009 17:50:05 GMT
Author: stevel
Date: Thu Mar 12 17:50:03 2009
New Revision: 752949

URL: http://svn.apache.org/viewvc?rev=752949&view=rev
Log:
HADOOP-3628 this is all the files changed to add lifecycle to the Hadoop services, new tests, and patches to many existing tests to shut down the mini clusters that have been given a consistent java.io.Closeable interface, along with static utility methods to close them

Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/Service.java
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerLifecycle.java
    hadoop/core/trunk/src/test/org/apache/hadoop/net/TestDNS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestServiceLifecycle.java
Modified:
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/cli/TestCLI.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestFileAppend2.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
    hadoop/core/trunk/conf/log4j.properties
    hadoop/core/trunk/ivy/libraries.properties
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
    hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
    hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
    hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java
    hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/test/hadoop-site.xml
    hadoop/core/trunk/src/test/log4j.properties
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/cli/TestCLI.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/cli/TestCLI.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/cli/TestCLI.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/cli/TestCLI.java Thu Mar 12 17:50:03 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.cli;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 
 import javax.xml.parsers.SAXParser;
@@ -157,16 +158,13 @@
    * Tear down
    */
   public void tearDown() throws Exception {
-    boolean success = false;
-    mrCluster.shutdown();
-    
-    dfs.close();
-    dfsCluster.shutdown();
-    success = true;
-    Thread.sleep(2000);
-
-    assertTrue("Error tearing down Mini DFS & MR clusters", success);
-    
+    MiniMRCluster.close(mrCluster);
+    try {
+      dfs.close();
+    } catch (IOException e) {
+      LOG.error("When closing dfs", e);
+    }
+    MiniDFSCluster.close(dfsCluster);
     displayResults();
   }
   

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestFileAppend2.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestFileAppend2.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestFileAppend2.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestFileAppend2.java Thu Mar 12 17:50:03 2009
@@ -75,6 +75,7 @@
   Workload[] workload = null;
   ArrayList<Path> testFiles = new ArrayList<Path>();
   volatile static boolean globalStatus = true;
+  private MiniDFSCluster cluster;
 
   //
   // create a buffer that contains the entire test file data.
@@ -116,6 +117,15 @@
     }
   }
 
+  /**
+   * Tears down the fixture, for example, close a network connection. This method
+   * is called after a test is executed.
+   */
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+    MiniDFSCluster.close(cluster);
+  }
 
   /**
    * Creates one file, writes a few bytes to it and then closed it.
@@ -130,7 +140,7 @@
     conf.setInt("dfs.datanode.handler.count", 50);
     conf.setBoolean("dfs.support.append", true);
     initBuffer(fileSize);
-    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
     try {
       { // test appending to a file.
@@ -263,7 +273,6 @@
       throw new IOException("Throwable : " + e);
     } finally {
       fs.close();
-      cluster.shutdown();
     }
   }
 
@@ -380,7 +389,7 @@
     conf.setInt("dfs.datanode.handler.count", 50);
     conf.setBoolean("dfs.support.append", true);
 
-    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
+    cluster = new MiniDFSCluster(conf, numDatanodes, 
                                                 true, null);
     cluster.waitActive();
     FileSystem fs = cluster.getFileSystem();
@@ -416,7 +425,6 @@
       }
     } finally {
       fs.close();
-      cluster.shutdown();
     }
 
     // If any of the worker thread failed in their job, indicate that

Modified: hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java Thu Mar 12 17:50:03 2009
@@ -97,8 +97,11 @@
       
         if (maxWaitSec > 0 && 
             (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
-          throw new IOException("Timedout while waiting for all blocks to " +
-                                " be replicated for " + filename);
+          throw new IOException("Timed out while waiting for all blocks to "
+                  + " be replicated for " + filename
+                  +" after " + maxWaitSec + " seconds\n"
+                  + "Expecting " + expected + ", got "
+                  + actual + ".");
         }
       
         try {
@@ -191,9 +194,7 @@
       waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
       
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      MiniDFSCluster.close(cluster);
     }
-  }  
+  }
 }

Modified: hadoop/core/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/log4j.properties?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/conf/log4j.properties (original)
+++ hadoop/core/trunk/conf/log4j.properties Thu Mar 12 17:50:03 2009
@@ -36,8 +36,10 @@
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
+log4j.appender.console.immediateFlush=true
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t] %p %c{2} %x: %m%n
+#log4j.appender.console.layout.ConversionPattern=%-4r %-5p %c %x - %m%n
 
 #
 # TaskLog Appender

Modified: hadoop/core/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/ivy/libraries.properties?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/ivy/libraries.properties (original)
+++ hadoop/core/trunk/ivy/libraries.properties Thu Mar 12 17:50:03 2009
@@ -35,7 +35,7 @@
 hsqldb.version=1.8.0.10
 
 #ivy.version=2.0.0-beta2
-ivy.version=2.0.0-rc2
+ivy.version=2.0.0
 
 jasper.version=5.5.12
 #not able to figureout the version of jsp & jsp-api version to get it resolved throught ivy

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Thu Mar 12 17:50:03 2009
@@ -71,7 +71,7 @@
    * it to succeed. Then program is launched with insufficient memory and 
    * is expected to be a failure.  
    */
-  public void testCommandLine() {
+  public void testCommandLine() throws Exception {
     if (StreamUtil.isCygwin()) {
       return;
     }
@@ -88,11 +88,9 @@
       fs.delete(outputPath, true);
       assertFalse("output not cleaned up", fs.exists(outputPath));
       mr.waitUntilIdle();
-    } catch(IOException e) {
-      fail(e.toString());
     } finally {
-      mr.shutdown();
-      dfs.shutdown();
+      MiniDFSCluster.close(dfs);
+      MiniMRCluster.close(mr);
     }
   }
 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java Thu Mar 12 17:50:03 2009
@@ -185,6 +185,9 @@
   private Properties properties;
   private Properties overlay;
   private ClassLoader classLoader;
+  private static final String ERROR_PARSING_CONF_FILE = 
+          "Error parsing configuration resource ";
+
   {
     classLoader = Thread.currentThread().getContextClassLoader();
     if (classLoader == null) {
@@ -973,7 +976,7 @@
     }
   }
 
-  private synchronized Properties getProps() {
+  protected synchronized Properties getProps() {
     if (properties == null) {
       properties = new Properties();
       loadResources(properties, resources, quietmode);
@@ -1157,16 +1160,16 @@
       }
         
     } catch (IOException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     } catch (DOMException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     } catch (SAXException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     } catch (ParserConfigurationException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     }
   }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java Thu Mar 12 17:50:03 2009
@@ -22,6 +22,7 @@
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.net.BindException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -457,7 +458,11 @@
           // then try the next port number.
           if (ex instanceof BindException) {
             if (!findPort) {
-              throw (BindException) ex;
+              BindException be = new BindException(
+                      "Port in use: " + listener.getHost()
+                              + ":" + listener.getPort());
+              be.initCause(ex);
+              throw be;
             }
           } else {
             LOG.info("HttpServer.start() threw a non Bind IOException"); 
@@ -489,6 +494,14 @@
   }
 
   /**
+   * Test for the availability of the web server
+   * @return true if the web server is started, false otherwise
+   */
+  public boolean isAlive() {
+    return webServer.isStarted();
+  }
+
+  /**
    * A very simple servlet to serve up a text representation of the current
    * stack traces. It both returns the stacks to the caller and logs them.
    * Currently the stack traces are done sequentially rather than exactly the

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Thu Mar 12 17:50:03 2009
@@ -284,8 +284,9 @@
     /** Connect to the server and set up the I/O streams. It then sends
      * a header to the server and starts
      * the connection thread that waits for responses.
+     * @throws IOException if the connection attempt was unsuccessful
      */
-    private synchronized void setupIOstreams() {
+    private synchronized void setupIOstreams() throws IOException {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       }
@@ -308,7 +309,7 @@
             /* The max number of retries is 45,
              * which amounts to 20s*45 = 15 minutes retries.
              */
-            handleConnectionFailure(timeoutFailures++, 45, toe);
+            handleConnectionFailure(timeoutFailures++, maxRetries, toe);
           } catch (IOException ie) {
             handleConnectionFailure(ioFailures++, maxRetries, ie);
           }
@@ -327,6 +328,7 @@
       } catch (IOException e) {
         markClosed(e);
         close();
+        throw e;
       }
     }
 
@@ -358,7 +360,7 @@
 
       // throw the exception if the maximum number of retries is reached
       if (curRetries >= maxRetries) {
-        throw ioe;
+        throw wrapException(remoteId.getAddress(), ioe);
       }
 
       // otherwise back off and retry
@@ -367,7 +369,7 @@
       } catch (InterruptedException ignored) {}
       
       LOG.info("Retrying connect to server: " + server + 
-          ". Already tried " + curRetries + " time(s).");
+          ". Already tried " + curRetries + " time(s) out of "+ maxRetries);
     }
 
     /* Write the header for each connection

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java Thu Mar 12 17:50:03 2009
@@ -301,7 +301,7 @@
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
-  static VersionedProtocol waitForProxy(Class protocol,
+  public static VersionedProtocol waitForProxy(Class protocol,
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                Configuration conf,

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java Thu Mar 12 17:50:03 2009
@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.net;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.util.Enumeration;
 import java.util.Vector;
 
@@ -38,6 +43,29 @@
  * 
  */
 public class DNS {
+  private static final String LOCALHOST = "localhost";
+  private static final Log LOG = LogFactory.getLog(DNS.class);
+
+  /**
+   * Returns the hostname associated with the specified IP address by the
+   * provided nameserver.
+   *
+   * @param address The address to reverse lookup
+   * @param nameserver The host name of a reachable DNS server; can be null
+   * @return The host name associated with the provided IP
+   * @throws NamingException If a NamingException is encountered
+   * @throws IllegalArgumentException if the protocol is unsupported
+   */
+  public static String reverseDns(InetAddress address, String nameserver)
+          throws NamingException {
+    if (address instanceof Inet4Address) {
+      return reverseDns((Inet4Address) address, nameserver);
+    } else if (address instanceof Inet6Address) {
+      return reverseDns((Inet6Address) address, nameserver);
+    } else {
+      throw new IllegalArgumentException("Unsupported Protocol " + address);
+    }
+  }
 
   /**
    * Returns the hostname associated with the specified IP address by the
@@ -45,31 +73,76 @@
    * 
    * @param hostIp
    *            The address to reverse lookup
-   * @param ns
-   *            The host name of a reachable DNS server
+   * @param nameserver
+   *            The host name of a reachable DNS server; can be null
    * @return The host name associated with the provided IP
    * @throws NamingException
    *             If a NamingException is encountered
    */
-  public static String reverseDns(InetAddress hostIp, String ns)
+  private static String reverseDns(Inet4Address hostIp, String nameserver)
     throws NamingException {
     //
     // Builds the reverse IP lookup form
     // This is formed by reversing the IP numbers and appending in-addr.arpa
     //
-    String[] parts = hostIp.getHostAddress().split("\\.");
-    String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
+    byte[] address32 = hostIp.getAddress();
+    String reverseIP;
+    reverseIP =    ""+(address32[3]&0xff) + '.'
+                    + (address32[2] & 0xff) + '.'
+                    + (address32[1] & 0xff) + '.'
+                    + (address32[0] & 0xff) + '.'
+                    + "in-addr.arpa";
+/*
+    String hostAddress = hostIp.getHostAddress();
+    String[] parts = hostAddress.split("\\.");
+    if (parts.length < 4) {
+      throw new IllegalArgumentException("Unable to determine IPv4 address of "
+              + hostAddress);
+    }
+    reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
       + parts[0] + ".in-addr.arpa";
+*/
 
+    return retrievePTRRecord(nameserver, reverseIP);
+  }
+
+  /**
+   * Retrieve the PTR record from a DNS entry; if given the right
+   * reverse IP this will resolve both IPv4 and IPv6 addresses
+   * @param nameserver name server to use; can be null
+   * @param reverseIP reverse IP address to look up
+   * @return the PTR record of the host
+   * @throws NamingException if the record can not be found.
+   */
+  private static String retrievePTRRecord(String nameserver, String reverseIP)
+          throws NamingException {
     DirContext ictx = new InitialDirContext();
-    Attributes attribute =
-      ictx.getAttributes("dns://"               // Use "dns:///" if the default
-                         + ((ns == null) ? "" : ns) + 
-                         // nameserver is to be used
+    try {
+      // Use "dns:///" if the default nameserver is to be used
+      Attributes attribute = ictx.getAttributes("dns://"
+                         + ((nameserver == null) ? "" : nameserver) +
                          "/" + reverseIP, new String[] { "PTR" });
-    ictx.close();
-    
-    return attribute.get("PTR").get().toString();
+      return attribute.get("PTR").get().toString();
+    } finally {
+      ictx.close();
+    }
+  }
+
+  /**
+   * Returns the hostname associated with the specified IP address by the
+   * provided nameserver.
+   *
+   * @param address
+   *            The address to reverse lookup
+   * @param nameserver
+   *            The host name of a reachable DNS server; can be null
+   * @return The host name associated with the provided IP
+   * @throws NamingException
+   *             If a NamingException is encountered
+   */
+  private static String reverseDns(Inet6Address address, String nameserver)
+          throws NamingException {
+    throw new IllegalArgumentException("Reverse DNS lookup of IPv6 is currently unsupported: " + address);
   }
 
   /**
@@ -90,21 +163,23 @@
     try {
       NetworkInterface netIF = NetworkInterface.getByName(strInterface);
       if (netIF == null)
-        return new String[] { InetAddress.getLocalHost()
-                              .getHostAddress() };
+        return new String[] {getLocalHostIPAddress()};
       else {
         Vector<String> ips = new Vector<String>();
-        Enumeration e = netIF.getInetAddresses();
-        while (e.hasMoreElements())
-          ips.add(((InetAddress) e.nextElement()).getHostAddress());
-        return ips.toArray(new String[] {});
+        Enumeration<InetAddress> e = netIF.getInetAddresses();
+
+        while (e.hasMoreElements()) {
+          ips.add((e.nextElement()).getHostAddress());
+        }
+        return ips.toArray(new String[ips.size()]);
       }
     } catch (SocketException e) {
-      return new String[] { InetAddress.getLocalHost().getHostAddress() };
+      return new String[] {getLocalHostIPAddress()};
     }
   }
 
-  /**
+
+    /**
    * Returns the first available IP address associated with the provided
    * network interface
    * 
@@ -130,26 +205,98 @@
    *            The DNS host name
    * @return A string vector of all host names associated with the IPs tied to
    *         the specified interface
-   * @throws UnknownHostException
+   * @throws UnknownHostException if the hostname cannot be determined
    */
   public static String[] getHosts(String strInterface, String nameserver)
     throws UnknownHostException {
     String[] ips = getIPs(strInterface);
     Vector<String> hosts = new Vector<String>();
-    for (int ctr = 0; ctr < ips.length; ctr++)
+    for (String ip : ips) {
+      InetAddress ipAddr = null;
       try {
-        hosts.add(reverseDns(InetAddress.getByName(ips[ctr]),
-                             nameserver));
-      } catch (Exception e) {
+        ipAddr = InetAddress.getByName(ip);
+        hosts.add(reverseDns(ipAddr, nameserver));
+      } catch (UnknownHostException e) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Unable to resolve hostname of " + ip + ": " + e, e);
+      } catch (NamingException e) {
+        if(LOG.isDebugEnabled())
+          LOG.debug("Unable to reverse DNS of host" + ip
+                +  (ipAddr != null ? (" and address "+ipAddr): "")
+                + " : " + e, e);
+      } catch (IllegalArgumentException e) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Unable to resolve IP address of " + ip
+                  + (ipAddr != null ? (" and address " + ipAddr) : "")
+                  + " : " + e, e);
       }
+    }
 
-    if (hosts.size() == 0)
-      return new String[] { InetAddress.getLocalHost().getCanonicalHostName() };
-    else
-      return hosts.toArray(new String[] {});
+    if (hosts.isEmpty()) {
+      return new String[] { getLocalHostname() };
+    } else {
+      return hosts.toArray(new String[hosts.size()]);
+    }
   }
 
   /**
+   * The cached hostname -initially null.
+   */
+
+  private static volatile String cachedHostname;
+
+  /**
+   * The cached address hostname -initially null.
+   */
+  private static volatile String cachedHostAddress;
+
+    /**
+     * Determine the local hostname; retrieving it from cache if it is known
+     * If we cannot determine our host name, return "localhost"
+     * This code is not synchronized; if more than one thread calls it while
+     * it is determining the address, the last one to exit the loop wins.
+     * However, as both threads are determining and caching the same value, it
+     * should be moot.
+     * @return the local hostname or "localhost"
+     */
+  private static String getLocalHostname() {
+    if(cachedHostname == null) {
+      try {
+        cachedHostname = InetAddress.getLocalHost().getCanonicalHostName();
+      } catch (UnknownHostException e) {
+        LOG.info("Unable to determine local hostname "
+                + "-falling back to \""+LOCALHOST+"\"", e);
+        cachedHostname = LOCALHOST;
+      }
+    }
+    return cachedHostname;
+  }
+
+  /**
+   * Get the IPAddress of the local host as a string. This may be a loop back
+   * value.
+   * This code is not synchronized; if more than one thread calls it while
+   * it is determining the address, the last one to exit the loop wins.
+   * However, as both threads are determining and caching the same value, it
+   * should be moot.
+   * @return the IPAddress of the localhost
+   * @throws UnknownHostException if not even "localhost" resolves.
+   */
+  private static String getLocalHostIPAddress() throws UnknownHostException {
+    if (cachedHostAddress == null) {
+      try {
+        InetAddress localHost = InetAddress.getLocalHost();
+        cachedHostAddress = localHost.getHostAddress();
+      } catch (UnknownHostException e) {
+        LOG.info("Unable to determine local IP Address "
+                + "-falling back to loopback address", e);
+        cachedHostAddress = InetAddress.getByName(LOCALHOST).getHostAddress();
+      }
+    }
+    return cachedHostAddress;
+  }
+
+    /**
    * Returns all the host names associated by the default nameserver with the
    * address bound to the specified network interface
    * 
@@ -158,7 +305,7 @@
    * @return The list of host names associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the deault interface
+   *             If one is encountered while querying the default interface
    * 
    */
   public static String[] getHosts(String strInterface)
@@ -177,15 +324,17 @@
    * @return The default host names associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the deault interface
+   *             If one is encountered while querying the default interface
    */
   public static String getDefaultHost(String strInterface, String nameserver)
     throws UnknownHostException {
-    if (strInterface.equals("default")) 
-      return InetAddress.getLocalHost().getCanonicalHostName();
+    if ("default".equals(strInterface)) {
+      return getLocalHostname();
+    }
 
-    if (nameserver != null && nameserver.equals("default"))
+    if ("default".equals(nameserver)) {
       return getDefaultHost(strInterface);
+    }
 
     String[] hosts = getHosts(strInterface, nameserver);
     return hosts[0];
@@ -196,11 +345,12 @@
    * nameserver with the address bound to the specified network interface
    * 
    * @param strInterface
-   *            The name of the network interface to query (e.g. eth0)
+   *            The name of the network interface to query (e.g. eth0).
+   *            Must not be null.
    * @return The default host name associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the deault interface
+   *             If one is encountered while querying the default interface
    */
   public static String getDefaultHost(String strInterface)
     throws UnknownHostException {

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java Thu Mar 12 17:50:03 2009
@@ -26,6 +26,7 @@
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.net.ConnectException;
 import java.nio.channels.SocketChannel;
 import java.util.Map.Entry;
 import java.util.regex.Pattern;
@@ -180,9 +181,12 @@
     String oldAddr = conf.get(oldBindAddressName);
     String oldPort = conf.get(oldPortName);
     String newAddrPort = conf.get(newBindAddressName);
-    if (oldAddr == null && oldPort == null) {
+    if (oldAddr == null && oldPort == null && newAddrPort != null) {
       return newAddrPort;
     }
+    if (newAddrPort == null) {
+      throw new IllegalArgumentException("No value for " + newBindAddressName);
+    }
     String[] newAddrPortParts = newAddrPort.split(":",2);
     if (newAddrPortParts.length != 2) {
       throw new IllegalArgumentException("Invalid address/port: " + 
@@ -395,14 +399,18 @@
     if (socket == null || endpoint == null || timeout < 0) {
       throw new IllegalArgumentException("Illegal argument for connect()");
     }
-    
-    SocketChannel ch = socket.getChannel();
-    
-    if (ch == null) {
-      // let the default implementation handle it.
-      socket.connect(endpoint, timeout);
-    } else {
-      SocketIOWithTimeout.connect(ch, endpoint, timeout);
+    try {
+      SocketChannel ch = socket.getChannel();
+
+      if (ch == null) {
+        // let the default implementation handle it.
+        socket.connect(endpoint, timeout);
+      } else {
+        SocketIOWithTimeout.connect(ch, endpoint, timeout);
+      }
+    } catch (ConnectException e) {
+      throw (ConnectException) new ConnectException(
+              e + " connecting to " + endpoint).initCause(e);
     }
   }
   

Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java Thu Mar 12 17:50:03 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * A mock service that can be set to fail in different parts of its lifecycle,
+ * and which counts the number of times its inner classes changed state.
+ */
+
+public class MockService extends Service {
+
+  /**
+   * Build from an empty configuration
+   */
+  public MockService() {
+    super(new Configuration());
+  }
+
+  /**
+   * Build from a configuration file
+   * @param conf
+   */
+  public MockService(Configuration conf) {
+    super(conf);
+  }
+
+  private boolean failOnStart, failOnPing, failOnClose;
+  private boolean goLiveInStart = true;
+  private boolean closed = true;
+  private volatile int stateChangeCount = 0;
+  private volatile int pingCount = 0;
+
+  public void setFailOnStart(boolean failOnStart) {
+    this.failOnStart = failOnStart;
+  }
+
+  public void setFailOnPing(boolean failOnPing) {
+    this.failOnPing = failOnPing;
+  }
+
+  public void setGoLiveInStart(boolean goLiveInStart) {
+    this.goLiveInStart = goLiveInStart;
+  }
+
+  public void setFailOnClose(boolean failOnClose) {
+    this.failOnClose = failOnClose;
+  }
+
+  public boolean isClosed() {
+    return closed;
+  }
+
+  /**
+   * Go live
+   *
+   * @throws ServiceStateException if we were not in a state to do so
+   */
+  public void goLive() throws ServiceStateException {
+    enterLiveState();
+  }
+
+  /**
+   * {@inheritDoc}
+   * @throws IOException  if {@link #failOnStart is set}
+   */
+  @Override
+  protected void innerStart() throws IOException {
+    if (failOnStart) {
+      throw new MockServiceException("failOnStart");
+    }
+    if (goLiveInStart) {
+      goLive();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   * @throws IOException if {@link #failOnPing is set} @param status
+   */
+  @Override
+  protected void innerPing(ServiceStatus status) throws IOException {
+    pingCount++;
+    if (failOnPing) {
+      throw new MockServiceException("failOnPing");
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException if {@link #failOnClose} is true
+   */
+  protected void innerClose() throws IOException {
+    closed = true;
+    if (failOnClose) {
+      throw new MockServiceException("failOnClose");
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected void onStateChange(ServiceState oldState,
+                               ServiceState newState) {
+    super.onStateChange(oldState, newState);
+    stateChangeCount++;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * A public method do change state
+   */
+  public void changeState(ServiceState state)
+          throws ServiceStateException {
+    setServiceState(state);
+  }
+
+  public int getStateChangeCount() {
+    return stateChangeCount;
+  }
+
+  public int getPingCount() {
+    return pingCount;
+  }
+
+  /**
+   * An exception to indicate we have triggered a mock event
+   */
+  static class MockServiceException extends IOException {
+
+    private MockServiceException(String message) {
+      super(message);
+    }
+  }
+}

Added: hadoop/core/trunk/src/core/org/apache/hadoop/util/Service.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/Service.java?rev=752949&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/Service.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/Service.java Thu Mar 12 17:50:03 2009
@@ -0,0 +1,1006 @@
+/*
+ * Copyright  2008 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.Closeable;
+import java.util.Date;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is the base class for services that can be deployed. A service is any
+ * Hadoop class that has a standard lifecycle
+ *
+ * The lifecycle of a Service is:
+ *
+ * <ol>
+ *
+ * <li>Component is Created, enters the {@link ServiceState#CREATED} state.
+ * This happens in the constructor. </li>
+ *
+ * <li>Component is started
+ * through a call to {@link Service#start()} ()}. If successful, it enters the
+ * {@link ServiceState#STARTED} state. If not, it enters the {@link
+ * ServiceState#FAILED} state. </li>
+ *
+ * <li>Once the component considers itself
+ * fully started, it enters the {@link ServiceState#LIVE} state. This implies it
+ * is providing a service to external callers. </li>
+ *
+ * </ol>
+ *
+ * From any state, the service can be terminated/closed through a call to
+ * {@link Service#close()}, which may throw an {@link IOException}, or
+ * {@link Service#closeQuietly()}, which catched and logs any such exception.
+ * These are idempotent calls, and will place the service in the
+ * {@link ServiceState#CLOSED}, terminated  state, after which
+ * it can no longer be used.
+ *
+ * To implement a Service.
+ *
+ * <ol>
+ *
+ * <li>Subclass this class</li>
+ *
+ * <li>Avoid doing any initialization/startup in the constructors, as this
+ * breaks the lifecycle and prevents subclassing. </li>
+ *
+ * <li>If the service wishes to declare itself as having failed, so that
+ * {@link #ping()} operations automatically fail, call
+ * {@link #enterFailedState(Throwable)} to enter the failed state.</li>
+ *
+ * <li>Override the {@link #innerStart()} method to start the service, including
+ * starting any worker threads.</li>
+ *
+ * <li>In the {@link #innerStart()} method, if the service is immediately live
+ * to external callers, call {@link #enterLiveState()} to mark the service as
+ * live.</li>
+
+ * <li>If startup is performed in separate threads, and includes bootstrap work,
+ * call the  {@link #enterLiveState()} in the separate thread <i>when the
+ * service is ready</i></li>
+ *
+ * <li>Override {@link #innerPing(ServiceStatus)} with any health checks that a service
+ * can perform to check that it is still "alive". These should be short lasting
+ * and non-side effecting. Simple checks for valid data structures and live
+ * worker threads are good examples. When the service thinks that something
+ * has failed, throw an IOException with a meaningful error message!
+ * </li>
+ *
+ * <li>Override {@link #innerClose()} to perform all shutdown logic.
+ * Be robust here and shut down cleanly even if the service did not start up
+ * completely. Do not assume all fields are non-null</li>
+ *
+ * You should not need to worry about making these overridden methods
+ * synchronized, as they are only called when a service has entered a specific
+ * state -which is synchronized. Except for {@link #innerPing(ServiceStatus)} ,
+ * each method will only be called at most once in the life of a service instance.
+ * However, because findbugs can flag synchronization warnings, it is often
+ * simplest and safest to mark the innerX operations as synchronized.
+ */
+
+public abstract class Service extends Configured implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(Service.class);
+
+  /**
+   * The initial state of a service is {@link ServiceState#CREATED}
+   */
+  private volatile ServiceState serviceState = ServiceState.CREATED;
+
+  /**
+   * when did the state change?
+   */
+  private volatile Date lastStateChange = new Date();
+
+  /**
+   * A root cause for failure. May be null.
+   */
+  private Throwable failureCause;
+
+  /**
+   * Error string included in {@link ServiceStateException} exceptions
+   * when an operation is applied to a service that is not in the correct
+   * state for it.
+   * value: {@value}
+   */
+  public static final String ERROR_WRONG_STATE = " is in the wrong state.";
+
+  /**
+   * Error string included in {@link ServiceStateException} exceptions
+   * when a service with a null configuration is started
+   * value: {@value}
+   */
+  public static final String ERROR_NO_CONFIGURATION
+          = "Cannot initialize when unconfigured";
+
+  /**
+   * Construct a service with no configuration; one must be called with {@link
+   * #setConf(Configuration)} before the service is started
+   */
+  protected Service() {
+  }
+
+  /**
+   * Construct a Configured service
+   *
+   * @param conf the configuration
+   */
+  protected Service(Configuration conf) {
+    super(conf);
+  }
+
+  /**
+   * Start any work (usually in separate threads).
+   *
+   * When completed, the service will be in the {@link ServiceState#STARTED}
+   * state, or may have already transited to the {@link ServiceState#LIVE}
+   * state
+   *
+   * Subclasses must implement their work in {@link #innerStart()}, leaving the
+   * start() method to manage state checks and changes.
+   *
+   * @throws IOException           for any failure
+   * @throws ServiceStateException when the service is not in a state from which
+   *                               it can enter this state.
+   */
+  public void start() throws IOException {
+    synchronized (this) {
+      //this request is idempotent on either live or starting states; either
+      //state is ignored
+      ServiceState currentState = getServiceState();
+      if (currentState == ServiceState.LIVE ||
+              currentState == ServiceState.STARTED) {
+        return;
+      }
+      if (getConf() == null) {
+        throw new ServiceStateException(ERROR_NO_CONFIGURATION,
+                getServiceState());
+      }
+      //check and change states
+      enterState(ServiceState.STARTED);
+    }
+    try {
+      innerStart();
+    } catch (IOException e) {
+      enterFailedState(e);
+      throw e;
+    }
+  }
+
+  /**
+   * Ping: checks that a component considers itself live.
+   *
+   * This may trigger a health check in which the service probes its
+   * constituent parts to verify that they are themselves live.
+   * The base implementation considers any state other than
+   * {@link ServiceState#FAILED} and {@link ServiceState#CLOSED}
+   * to be valid, so it is OK to ping a
+   * component that is still starting up. However, in such situations, the inner
+   * ping health tests are skipped, because they are generally irrelvant.
+   *
+   * Subclasses should not normally override this method, but instead override
+   * {@link #innerPing(ServiceStatus)} with extra health checks that will only
+   * be called when a system is actually live.
+   * @return the current service state.
+   * @throws IOException           for any ping failure
+   * @throws ServiceStateException if the component is in a wrong state.
+   */
+  public ServiceStatus ping() throws IOException {
+    ServiceStatus status = new ServiceStatus(this);
+    ServiceState state = status.getState();
+    if (state == ServiceState.LIVE) {
+      try {
+        innerPing(status);
+      } catch (Throwable thrown) {
+        //TODO: what happens whenthe ping() returns >0 causes of failure but 
+        //doesn't throw an exception -this method will not get called. Is 
+        //that what we want?
+        status = onInnerPingFailure(status, thrown);
+      }
+    } else {
+      //ignore the ping
+      LOG.debug("ignoring ping request while in state " + state);
+      //but tack on any non-null failure cause, which may be a valid value
+      //in FAILED or TERMINATED states.
+      status.addThrowable(getFailureCause());
+    }
+    return status;
+  }
+
+  /**
+   * This is an override point for services -handle failure of the inner
+   * ping operation.
+   * The base implementation calls {@link #enterFailedState(Throwable)} and then
+   * updates the service status with the (new) state and the throwable
+   * that was caught.
+   * @param currentStatus the current status structure
+   * @param thrown the exception from the failing ping.
+   * @return an updated service status structure.
+   * @throws IOException for IO problems
+   */
+  protected ServiceStatus onInnerPingFailure(ServiceStatus currentStatus,
+                                             Throwable thrown) 
+          throws IOException {
+    //something went wrong
+    //mark as failed
+    //TODO: don't enter failed state on a failing ping? Just report the event
+    //to the caller?
+    enterFailedState(thrown);
+    //update the state
+    currentStatus.updateState(this);
+    currentStatus.addThrowable(thrown);
+    //and return the exception.
+    return currentStatus;
+  }
+
+  /**
+   * Convert any exception to an {@link IOException}
+   * If it already is an IOException, the exception is
+   * returned as is. If it is anything else, it is wrapped, with
+   * the original message retained.
+   * @param thrown the exception to forward
+   * @return an IOException representing or containing the forwarded exception
+   */
+  protected IOException forwardAsIOException(Throwable thrown) {
+    IOException newException;
+    if(thrown instanceof IOException) {
+      newException = (IOException) thrown;
+    } else {
+      IOException ioe = new IOException(thrown.toString());
+      ioe.initCause(thrown);
+      newException = ioe;
+    }
+    return newException;
+  }
+
+
+  /**
+   * Test for a service being in the {@link ServiceState#LIVE} or {@link
+   * ServiceState#STARTED}
+   *
+   * @return true if the service is in one of the two states.
+   */
+  public final boolean isRunning() {
+    ServiceState currentState = getServiceState();
+    return currentState == ServiceState.STARTED
+            || currentState == ServiceState.LIVE;
+  }
+
+  /**
+   * Shut down. This must be idempotent and turn errors into log/warn events -do
+   * your best to clean up even in the face of adversity. This method should be
+   * idempotent; if already terminated, return. Similarly, do not fail if the
+   * component never actually started.
+   *
+   * The implementation calls {@link #close()} and then
+   * {@link #logExceptionDuringQuietClose(Throwable)} if that method throws
+   * any exception.
+   */
+  public final void closeQuietly() {
+    try {
+      close();
+    } catch (Throwable e) {
+      logExceptionDuringQuietClose(e);
+    }
+  }
+
+  /**
+   * Closes this service. Subclasses are free to throw an exception, but
+   * they are expected to make a best effort attempt to close the service
+   * down as thoroughly as possible.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  public void close() throws IOException {
+    if (enterState(ServiceState.CLOSED)) {
+      innerClose();
+    }
+  }
+
+  /**
+   * This is a method called when exceptions are being logged and swallowed
+   * during termination. It logs the event at the error level.
+   *
+   * Subclasses may override this to do more advanced error handling/logging.
+   *
+   * @param thrown whatever was thrown
+   */
+  protected void logExceptionDuringQuietClose(Throwable thrown) {
+    LOG.error("Exception during termination: " + thrown,
+            thrown);
+  }
+
+  /**
+   * This method is designed for overriding, with subclasses implementing
+   * startup logic inside it. It is only called when the component is entering
+   * the running state; and will be called once only.
+   *
+   * When the work in here is completed, the component should set the service
+   * state to {@link ServiceState#LIVE} to indicate the service is now live.
+   *
+   * @throws IOException for any problem.
+   */
+  protected void innerStart() throws IOException {
+  }
+
+
+  /**
+   * This method is designed for overriding, with subclasses implementing health
+   * tests inside it.
+   *
+   * It is invoked whenever the component is called with {@link Service#ping()}
+   * and the call is not rejected.
+   * @param status the service status, which can be updated
+   * @throws IOException for any problem.
+   */
+  protected void innerPing(ServiceStatus status) throws IOException {
+  }
+
+  /**
+   * This method is designed for overriding, with subclasses implementing
+   * termination logic inside it.
+   *
+   * It is only called when the component is entering the closed state; and
+   * will be called once only.
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  protected void innerClose() throws IOException {
+
+  }
+
+  /**
+   * Get the current state of the service.
+   *
+   * @return the lifecycle state
+   */
+  public final ServiceState getServiceState() {
+    return serviceState;
+  }
+
+  /**
+   * This is the state transition graph represented as some nested switches.
+   * @return true if the transition is valid. For all states, the result when
+   * oldState==newState is false: that is not a transition, after all.
+   * @param oldState the old state of the service
+   * @param newState the new state
+   */
+  protected boolean isValidStateTransition(ServiceState oldState,
+                                           ServiceState newState) {
+    switch(oldState) {
+      case CREATED:
+        switch(newState) {
+          case STARTED:
+          case FAILED:
+          case CLOSED:
+            return true;
+          default:
+            return false;
+        }
+      case STARTED:
+        switch (newState) {
+          case LIVE:
+          case FAILED:
+          case CLOSED:
+            return true;
+          default:
+            return false;
+        }
+      case LIVE:
+        switch (newState) {
+          case STARTED:
+          case FAILED:
+          case CLOSED:
+            return true;
+          default:
+            return false;
+        }
+      case UNDEFINED:
+        //if we don't know where we were before (very, very unlikely), then
+        //let's get out of it
+        return true;
+      case FAILED:
+        //failure can only enter closed state
+        return newState == ServiceState.CLOSED;
+      case CLOSED:
+        //This is the end state. There is no exit.
+      default:
+        return false;
+    }
+  }
+
+  /**
+  * Set the service state.
+  * If there is a change in state, the {@link #lastStateChange} timestamp
+  * is updated and the {@link #onStateChange(ServiceState, ServiceState)} event
+  * is invoked.
+  * @param serviceState the new state
+  */
+  protected final void setServiceState(ServiceState serviceState) {
+    ServiceState oldState;
+    synchronized (this) {
+      oldState = this.serviceState;
+      this.serviceState = serviceState;
+    }
+    if (oldState != serviceState) {
+      lastStateChange = new Date();
+      onStateChange(oldState, serviceState);
+    }
+  }
+
+  /**
+   * Override point - a method called whenever there is a state change.
+   *
+   * The base class logs the event.
+   *
+   * @param oldState existing state
+   * @param newState new state.
+   */
+  protected void onStateChange(ServiceState oldState,
+                               ServiceState newState) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("State Change: " + toString()
+              + " transitioned from state " + oldState + " to " + newState);
+    }
+  }
+
+  /**
+   * Enter a new state if that is permitted from the current state.
+   * Does nothing if we are in that state; throws an exception if the
+   * state transition is not permitted
+   * @param  newState  the new state
+   * @return true if the service transitioned into this state, that is, it was
+   *         not already in the state
+   * @throws ServiceStateException if the service is not in either state
+   */
+  protected final synchronized boolean enterState(ServiceState newState)
+          throws ServiceStateException {
+    return enterState(getServiceState(), newState);
+  }
+
+  /**
+   * Check that a service is in a required entry state, or already in the
+   * desired exit state.
+   *
+   * @param entryState the state that is needed. If set to {@link
+   *                   ServiceState#UNDEFINED} then the entry state is not
+   *                   checked.
+   * @param exitState  the state that is desired when we exit
+   * @return true if the service transitioned into this state, that is, it was
+   *         not already in the state
+   * @throws ServiceStateException if the service is not in either state
+   */
+  protected final synchronized boolean enterState(ServiceState entryState,
+                                            ServiceState exitState)
+          throws ServiceStateException {
+    ServiceState currentState = getServiceState();
+    if (currentState == exitState) {
+      return false;
+    }
+    validateStateTransition(entryState, exitState);
+    setServiceState(exitState);
+    return true;
+  }
+
+  /**
+   * Check that the state transition is valid
+   * @param entryState the entry state
+   * @param exitState the exit state 
+   * @throws ServiceStateException if the state transition is not allowed
+   */
+  protected final void validateStateTransition(ServiceState entryState,
+                                         ServiceState exitState)
+          throws ServiceStateException {
+    if(!isValidStateTransition(entryState, exitState)) {
+      throw new ServiceStateException(toString()
+              + ERROR_WRONG_STATE
+              + " The service cannot move from the state " + entryState
+              + "to the state " + exitState,
+              entryState);
+    }
+  }
+
+
+  /**
+   * Verify that a service is in a specific state
+   *
+   * @param state the state that is required.
+   * @throws ServiceStateException if the service is in the wrong state
+   */
+  public final void verifyServiceState(ServiceState state)
+          throws ServiceStateException {
+    verifyState(getServiceState(), state, ServiceState.UNDEFINED);
+  }
+
+  /**
+   * Verify that a service is in either of two specific states
+   *
+   * @param expected  the state that is expected.
+   * @param expected2 a second state, which can be left at {@link
+   *                  ServiceState#UNDEFINED} for "do not check this"
+   * @throws ServiceStateException if the service is in the wrong state
+   */
+  public final void verifyServiceState(ServiceState expected, ServiceState expected2)
+          throws ServiceStateException {
+    verifyState(getServiceState(), expected, expected2);
+  }
+
+  /**
+   * Internal state verification test
+   *
+   * @param currentState the current state
+   * @param expected     the state that is expected.
+   * @param expected2    a second state, which can be left at {@link
+   *                     ServiceState#UNDEFINED} for "do not check this"
+   * @throws ServiceStateException if the service is in the wrong state
+   */
+  protected final void verifyState(ServiceState currentState,
+                             ServiceState expected,
+                             ServiceState expected2)
+          throws ServiceStateException {
+    boolean expected2defined = expected2 != ServiceState.UNDEFINED;
+    if (!(currentState == expected ||
+            (expected2defined && currentState == expected2))) {
+      throw new ServiceStateException(toString()
+              + ERROR_WRONG_STATE
+              + " Expected " + expected
+              + (expected2defined ? (" or " + expected2) : " ")
+              + " but the actual state is " + currentState,
+              currentState);
+    }
+  }
+
+  /**
+   * Helper method to enter the {@link ServiceState#FAILED} state.
+   *
+   * Call this whenever the service considers itself to have failed in a
+   * non-restartable manner.
+   *
+   * If the service was already terminated or failed, this operation does
+   * not trigger a state change.
+   * @param cause the cause of the failure
+   */
+  public final void enterFailedState(Throwable cause) {
+    synchronized (this) {
+      if(failureCause == null) {
+        failureCause = cause;
+      }
+    }
+    if(!isTerminated()) {
+      setServiceState(ServiceState.FAILED);
+    }
+  }
+
+
+  /**
+   * Shortcut method to enter the {@link ServiceState#LIVE} state.
+   *
+   * Call this when a service considers itself live
+   * @return true if this state was entered, false if it was already in it
+   * @throws ServiceStateException if the service is not currently in the
+   * STARTED or LIVE states
+   */
+  protected final boolean enterLiveState() throws ServiceStateException {
+    return enterState(ServiceState.LIVE);
+  }
+
+  /**
+   * Test for the service being terminated; non-blocking
+   *
+   * @return true if the service is currently terminated
+   */
+  public boolean isTerminated() {
+    return getServiceState() == ServiceState.CLOSED;
+  }
+
+
+  /**
+   * Override point: the name of this service. This is used
+   * to construct human-readable descriptions
+   * @return the name of this service for string messages
+   */
+  public String getServiceName() {
+    return "Service";
+  }
+
+  /**
+  * The toString operator returns the super class name/id, and the state. This
+  * gives all services a slightly useful message in a debugger or test report
+  *
+  * @return a string representation of the object.
+  */
+  @Override
+  public String toString() {
+    return getServiceName() + " instance " + super.toString() + " in state "
+            + getServiceState();
+  }
+
+
+  /**
+   * Get the cause of failure -will be null if not failed, and may be
+   * null even after failure.
+   * @return the exception that triggered entry into the failed state.
+   *
+   */
+  public Throwable getFailureCause() {
+    return failureCause;
+  }
+
+  /**
+   * Initialize and start a service. If the service fails to come up, it is
+   * terminated.
+   *
+   * @param service the service to deploy
+   * @throws IOException on any failure to deploy
+   */
+  public static void deploy(Service service)
+          throws IOException {
+    try {
+      service.start();
+    } catch (IOException e) {
+      //mark as failed
+      service.enterFailedState(e);
+      //we assume that the service really does know how to terminate
+      service.closeQuietly();
+      throw e;
+    } catch (Throwable t) {
+        //mark as failed
+        service.enterFailedState(t);
+        //we assume that the service really does know how to terminate
+        service.closeQuietly();
+        //and wrap the exception in an IOException that is rethrown
+        throw (IOException) new IOException(t.toString()).initCause(t);
+    }
+  }
+
+  /**
+   * Terminate a service that is not null
+   *
+   * @param service a service to terminate
+   */
+  public static void closeQuietly(Service service) {
+    if (service != null) {
+      service.closeQuietly();
+    }
+  }
+
+  /**
+   * Terminate a service or other closeable that is not null
+   *
+   * @param closeable the object to close
+   * @throws IOException any exception during the close operation
+   */
+  public static void close(Closeable closeable) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (IOException e) {
+        LOG.info("when closing :" + closeable+ ":" + e, e);
+      }
+    }
+  }
+
+
+    /**
+   * An exception that indicates there is something wrong with the state of the
+   * service
+   */
+  public static class ServiceStateException extends IOException {
+    private ServiceState state;
+
+
+    /**
+     * Create a service state exception with a standard message {@link
+     * Service#ERROR_WRONG_STATE} including the string value of the owning
+     * service, and the supplied state value
+     *
+     * @param service owning service
+     * @param state current state
+     */
+    public ServiceStateException(Service service, ServiceState state) {
+      this(service.toString()
+              + ERROR_WRONG_STATE + " : " + state,
+              null,
+              state);
+    }
+
+    /**
+     * Constructs an Exception with the specified detail message and service
+     * state.
+     *
+     * @param message The detail message (which is saved for later retrieval by
+     *                the {@link #getMessage()} method)
+     * @param state   the current state of the service
+     */
+    public ServiceStateException(String message, ServiceState state) {
+      this(message, null, state);
+    }
+
+    /**
+     * Constructs an Exception with the specified detail message, cause and
+     * service state.
+     *
+     * @param message message
+     * @param cause   optional root cause
+     * @param state   the state of the component
+     */
+    public ServiceStateException(String message,
+                                 Throwable cause,
+                                 ServiceState state) {
+      super(message, cause);
+      this.state = state;
+    }
+
+    /**
+     * Construct an exception. The lifecycle state of the specific component is
+     * extracted
+     *
+     * @param message message
+     * @param cause   optional root cause
+     * @param service originating service
+     */
+    public ServiceStateException(String message,
+                                 Throwable cause,
+                                 Service service) {
+      this(message, cause, service.getServiceState());
+    }
+
+    /**
+     * Get the state when this exception was raised
+     *
+     * @return the state of the service
+     */
+    public ServiceState getState() {
+      return state;
+    }
+
+
+  }
+
+  /**
+   * This is an exception that can be raised on a liveness failure.
+   */
+  public static class LivenessException extends IOException {
+
+    /**
+     * Constructs an exception with {@code null} as its error detail message.
+     */
+    public LivenessException() {
+    }
+
+    /**
+     * Constructs an Exception with the specified detail message.
+     *
+     * @param message The detail message (which is saved for later retrieval by
+     *                the {@link #getMessage()} method)
+     */
+    public LivenessException(String message) {
+      super(message);
+    }
+
+    /**
+     * Constructs an exception with the specified detail message and cause.
+     *
+     * <p> The detail message associated with {@code cause} is only incorporated
+     * into this exception's detail message when the message parameter is null.
+     *
+     * @param message The detail message (which is saved for later retrieval by
+     *                the {@link #getMessage()} method)
+     * @param cause   The cause (which is saved for later retrieval by the
+     *                {@link #getCause()} method).  (A null value is permitted,
+     *                and indicates that the cause is nonexistent or unknown.)
+     */
+    public LivenessException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    /**
+     * Constructs an exception with the specified cause and a detail message of
+     * {@code cause.toString())}. A null cause is allowed.
+     *
+     * @param cause The cause (which is saved for later retrieval by the {@link
+     *              #getCause()} method). Can be null.
+     */
+    public LivenessException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  /**
+   * The state of the service as perceived by the service itself. Failure is the
+   * odd one as it often takes a side effecting test (or an outsider) to
+   * observe.
+   */
+  public enum ServiceState {
+    /**
+     * we don't know or care what state the service is in
+     */
+    UNDEFINED,
+    /**
+     * The service has been created
+     */
+    CREATED,
+
+    /**
+     * The service is starting up.
+     * Its {@link Service#start()} method has been called.
+     * When it is ready for work, it will declare itself LIVE.
+     */
+    STARTED,
+    /**
+     * The service is now live and available for external use
+     */
+    LIVE,
+    /**
+     * The service has failed
+     */
+    FAILED,
+      /**
+     * the service has been shut down
+     * The container process may now destroy the instance
+     * Its {@link Service#close()} ()} method has been called.
+     */
+    CLOSED
+  }
+
+  /**
+   * This is the full service status
+   */
+  public static final class ServiceStatus implements Serializable {
+    /** enumerated state */
+    private ServiceState state;
+
+    /** name of the service */
+    private String name;
+
+    /** when did the state change?  */
+    private Date lastStateChange;
+
+    /**
+     * a possibly null array of exceptions that caused a system failure
+     */
+    public ArrayList<Throwable> throwables = new ArrayList<Throwable>(0);
+
+    /**
+     * Create an empty service status instance
+     */
+    public ServiceStatus() {
+    }
+
+    /**
+     * Create a service status instance with the base values set
+     * @param name service name
+     * @param state current state
+     * @param lastStateChange when did the state last change?
+     */
+    public ServiceStatus(String name, ServiceState state,
+                         Date lastStateChange) {
+      this.state = state;
+      this.name = name;
+      this.lastStateChange = lastStateChange;
+    }
+
+    /**
+     * Create a service status instance from the given service
+     *
+     * @param service service to read from
+     */
+    public ServiceStatus(Service service) {
+      name = service.getServiceName();
+      updateState(service);
+    }
+
+    /**
+     * Add a new throwable to the list. This is a no-op if it is null.
+     * To be safely sent over a network connection, the Throwable (and any
+     * chained causes) must be fully serializable.
+     * @param thrown the throwable. Can be null; will not be cloned.
+     */
+    public void addThrowable(Throwable thrown) {
+      if (thrown != null) {
+        throwables.add(thrown);
+      }
+    }
+
+    public List<Throwable> getThrowables() {
+      return throwables;
+    }
+
+    /**
+     * Get the current state
+     * @return the state
+     */
+    public ServiceState getState() {
+      return state;
+    }
+
+    /**
+     * set the state
+     * @param state new state
+     */
+    public void setState(ServiceState state) {
+      this.state = state;
+    }
+
+    /**
+     * Get the name of the service
+     * @return the service name
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Set the name of the service
+     * @param name the service name
+     */
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    /**
+     * Get the timestamp of the last state change
+     * @return when the service state last changed
+     */
+    public Date getLastStateChange() {
+      return lastStateChange;
+    }
+
+    /**
+     * Set the last state change
+     * @param lastStateChange the timestamp of the last state change
+     */
+    public void setLastStateChange(Date lastStateChange) {
+      this.lastStateChange = lastStateChange;
+    }
+
+    /**
+     * Update the service state
+     * @param service the service to update from
+     */
+    public void updateState(Service service) {
+      synchronized (service) {
+        setState(service.getServiceState());
+        setLastStateChange(service.lastStateChange);
+      }
+    }
+
+    /**
+     * The string operator includes the messages of every throwable
+     * in the list of failures
+     * @return the list of throwables
+     */
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append(getName()).append(" in state ").append(getState());
+      for (Throwable t : throwables) {
+        builder.append("\n ").append(t.toString());
+      }
+      return builder.toString();
+    }
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Mar 12 17:50:03 2009
@@ -1384,7 +1384,7 @@
     synchronized void openInfo() throws IOException {
       LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
       if (newInfo == null) {
-        throw new IOException("Cannot open filename " + src);
+        throw new FileNotFoundException("Cannot open HDFS file " + src);
       }
 
       if (locatedBlocks != null) {
@@ -3084,7 +3084,9 @@
      * resources associated with this stream.
      */
     private synchronized void closeInternal() throws IOException {
-      checkOpen();
+      if ( !clientRunning ) {
+          return;
+      }
       isClosed();
 
       try {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java Thu Mar 12 17:50:03 2009
@@ -24,6 +24,7 @@
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
@@ -549,8 +550,10 @@
 
     /**
      * Unlock storage.
-     * 
-     * @throws IOException
+     * Does nothing if there is no acquired lock
+     * @throws IOException for IO problems
+     * @throws ClosedChannelException if the channel owning the lock is
+     *  already closed.
      */
     public void unlock() throws IOException {
       if (this.lock == null)
@@ -703,11 +706,22 @@
 
   /**
    * Unlock all storage directories.
-   * @throws IOException
+   * @throws IOException on a failure to unlock
    */
   public void unlockAll() throws IOException {
-    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
-      it.next().unlock();
+    IOException ioe = null;
+    for (StorageDirectory storageDir : storageDirs) {
+      try {
+        storageDir.unlock();
+      } catch (IOException e) {
+        LOG.warn("Failed to unlock " + storageDir.getRoot() + " : " + e, e);
+        if (ioe != null) {
+          ioe = e;
+        }
+      }
+    }
+    if (ioe != null) {
+      throw ioe;
     }
   }
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Mar 12 17:50:03 2009
@@ -43,7 +43,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -91,6 +90,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.Service;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -123,7 +123,7 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode extends Configured 
+public class DataNode extends Service
     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
   
@@ -186,7 +186,7 @@
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
-  
+  private AbstractList<File> dataDirs;
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
   
@@ -206,20 +206,57 @@
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
+   * This constructor does not start the node, merely initialize it
+   * @param conf configuration to use
+   * @param dataDirs list of directories that may be used for data
+   * @throws IOException for historical reasons
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
     super(conf);
     datanodeObject = this;
+    this.dataDirs = dataDirs;
+  }
 
-    try {
-      startDataNode(conf, dataDirs);
-    } catch (IOException ie) {
-      shutdown();
-      throw ie;
+  /////////////////////////////////////////////////////
+  // Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * Start any work (in separate threads)
+   *
+   * @throws IOException for any startup failure
+   */
+  @Override
+  public void innerStart() throws IOException {
+    startDataNode(getConf(), dataDirs);
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * This implementation checks for the name system being non-null and live
+   *
+   * @throws IOException for any ping failure
+   * @throws LivenessException if the IPC server is not defined @param status the initial status
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (ipcServer == null) {
+      status.addThrowable(new LivenessException("No IPC Server running"));
+    }
+    if (dnRegistration == null) {
+      status.addThrowable(new LivenessException("Not registered to a namenode"));
     }
   }
     
+  /**
+   * Shut down this instance of the datanode. Returns only after shutdown is
+   * complete.
+   */
+  public void shutdown() {
+    closeQuietly();
+  }
   
   /**
    * This method starts the data node with the specified conf.
@@ -244,7 +281,7 @@
                                      conf.get("dfs.datanode.dns.nameserver","default"));
     }
     InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
-    
+    DataNode.nameNodeAddr = nameNodeAddr;
     this.socketTimeout =  conf.getInt("dfs.socket.timeout",
                                       HdfsConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -330,7 +367,6 @@
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
-    DataNode.nameNodeAddr = nameNodeAddr;
 
     //initialize periodic block scanner
     String reason = null;
@@ -357,6 +393,9 @@
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
         tmpInfoPort == 0, conf);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
+    }
     if (conf.getBoolean("dfs.https.enable", false)) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
@@ -364,6 +403,9 @@
       Configuration sslConf = new Configuration(false);
       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
           "ssl-server.xml"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
+      }
       this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
     }
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
@@ -420,6 +462,10 @@
         } catch (InterruptedException ie) {}
       }
     }
+    if(!shouldRun) {
+      throw new IOException("Datanode shut down during handshake with NameNode "
+               + getNameNodeAddr());
+    }
     String errorMsg = null;
     // verify build version
     if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
@@ -520,10 +566,14 @@
    * @see FSNamesystem#registerDatanode(DatanodeRegistration)
    * @throws IOException
    */
-  private void register() throws IOException {
+  protected void register() throws IOException {
     if (dnRegistration.getStorageID().equals("")) {
       setNewStorageID(dnRegistration);
     }
+    //if we are LIVE, move into the STARTED state, as registration implies that
+    //the node is no longer LIVE
+    enterState(ServiceState.LIVE, ServiceState.STARTED);
+    //spin until the server is up.
     while(shouldRun) {
       try {
         // reset name to machineName. Mainly for web interface.
@@ -552,7 +602,8 @@
           + dnRegistration.getStorageID() 
           + ". Expecting " + storage.getStorageID());
     }
-    
+    //at this point the DataNode is now live.
+    enterLiveState();
     // random short delay - helps scatter the BR from all DNs
     scheduleBlockReport(initialBlockReportDelay);
   }
@@ -563,18 +614,25 @@
    * This method can only be called by the offerService thread.
    * Otherwise, deadlock might occur.
    */
-  public void shutdown() {
-    if (infoServer != null) {
-      try {
-        infoServer.stop();
-      } catch (Exception e) {
-        LOG.warn("Exception shutting down DataNode", e);
+  @Override
+  protected void innerClose() throws IOException {
+    synchronized (this) {
+      //disable the should run flag first, so that everything out there starts
+      //to shut down
+      shouldRun = false;
+      //shut down the infoserver
+      if (infoServer != null) {
+        try {
+          infoServer.stop();
+        } catch (Exception e) {
+          LOG.debug("Ignoring exception when shutting down the infoserver", e);
+        }
+      }
+      //shut down the IPC server
+      if (ipcServer != null) {
+        ipcServer.stop();
       }
     }
-    if (ipcServer != null) {
-      ipcServer.stop();
-    }
-    this.shouldRun = false;
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
@@ -602,9 +660,10 @@
     
     RPC.stopProxy(namenode); // stop the RPC threads
     
-    if(upgradeManager != null)
+    if(upgradeManager != null) {
       upgradeManager.shutdownUpgrade();
-    if (blockScannerThread != null) { 
+    }
+    if (blockScannerThread != null) {
       blockScannerThread.interrupt();
       try {
         blockScannerThread.join(3600000L); // wait for at most 1 hour
@@ -613,8 +672,10 @@
     }
     if (storage != null) {
       try {
-        this.storage.unlockAll();
+        storage.unlockAll();
       } catch (IOException ie) {
+        LOG.warn("Ignoring exception when unlocking storage: "+ie,
+                ie);
       }
     }
     if (dataNodeThread != null) {
@@ -805,14 +866,13 @@
         if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
             DisallowedDatanodeException.class.getName().equals(reClass) ||
             IncorrectVersionException.class.getName().equals(reClass)) {
-          LOG.warn("DataNode is shutting down: " + 
-                   StringUtils.stringifyException(re));
+          LOG.warn("DataNode is shutting down: " + re, re);
           shutdown();
           return;
         }
-        LOG.warn(StringUtils.stringifyException(re));
+        LOG.warn(re, re);
       } catch (IOException e) {
-        LOG.warn(StringUtils.stringifyException(e));
+        LOG.warn(e, e);
       }
     } // while (shouldRun)
   } // offerService
@@ -1185,7 +1245,9 @@
         startDistributedUpgradeIfNeeded();
         offerService();
       } catch (Exception ex) {
-        LOG.error("Exception: " + StringUtils.stringifyException(ex));
+        LOG.error("Exception while in state " + getServiceState()
+                + " and shouldRun=" + shouldRun + ": " + ex,
+                ex);
         if (shouldRun) {
           try {
             Thread.sleep(5000);
@@ -1265,33 +1327,52 @@
    * @param conf Configuration instance to use.
    * @return DataNode instance for given list of data dirs and conf, or null if
    * no directory from this directory list can be created.
-   * @throws IOException
+   * @throws IOException if problems occur when starting the data node
    */
   public static DataNode makeInstance(String[] dataDirs, Configuration conf)
-    throws IOException {
+          throws IOException {
     ArrayList<File> dirs = new ArrayList<File>();
-    for (int i = 0; i < dataDirs.length; i++) {
-      File data = new File(dataDirs[i]);
+    StringBuffer invalid = new StringBuffer();
+    for (String dataDir : dataDirs) {
+      File data = new File(dataDir);
       try {
         DiskChecker.checkDir(data);
         dirs.add(data);
       } catch(DiskErrorException e) {
-        LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
+        LOG.warn("Invalid directory in dfs.data.dir: " + e, e);
+        invalid.append(dataDir);
+        invalid.append(" ");
       }
     }
-    if (dirs.size() > 0) 
-      return new DataNode(conf, dirs);
-    LOG.error("All directories in dfs.data.dir are invalid.");
-    return null;
+    if (dirs.size() > 0) {
+      DataNode dataNode = new DataNode(conf, dirs);
+      Service.deploy(dataNode);
+      return dataNode;
+    } else {
+      LOG.error("All directories in dfs.data.dir are invalid: " + invalid);
+      return null;
+    }
   }
 
+
+  /**
+   * {@inheritDoc}
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "DataNode";
+  }
+  
   @Override
   public String toString() {
-    return "DataNode{" +
+    return "DataNode {" +
       "data=" + data +
-      ", localName='" + dnRegistration.getName() + "'" +
-      ", storageID='" + dnRegistration.getStorageID() + "'" +
+      (dnRegistration != null? (
+        ", localName='" + dnRegistration.getName() + "'" +
+        ", storageID='" + dnRegistration.getStorageID() + "'" ) : "") +
       ", xmitsInProgress=" + xmitsInProgress +
+      ", state="+ getServiceState() +
       "}";
   }
   

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=752949&r1=752948&r2=752949&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Mar 12 17:50:03 2009
@@ -185,6 +185,9 @@
       }
 
       File blockFiles[] = dir.listFiles();
+      if (blockFiles == null) {
+        throw new IllegalStateException("Not a valid directory: " + dir);
+      }
       for (int i = 0; i < blockFiles.length; i++) {
         if (Block.isBlockFilename(blockFiles[i])) {
           long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);



Mime
View raw message