hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r755965 [1/2] - in /hadoop/core/branches/HADOOP-3628: ./ ivy/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/http/ src/core/org/apache/hadoop/ipc/ src/core/org/apache/...
Date Thu, 19 Mar 2009 12:15:53 GMT
Author: stevel
Date: Thu Mar 19 12:15:51 2009
New Revision: 755965

URL: http://svn.apache.org/viewvc?rev=755965&view=rev
Log:
HADOOP-3628  merge revision r752949 into its proper home

Added:
    hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/util/MockService.java
      - copied unchanged from r752949, hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java
      - copied unchanged from r752949, hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestGetServerAddress.java
      - copied unchanged from r752949, hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/empty-configuration.xml
      - copied unchanged from r752949, hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
      - copied unchanged from r752949, hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestTaskTrackerLifecycle.java
      - copied unchanged from r752949, hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerLifecycle.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/net/TestDNS.java
      - copied unchanged from r752949, hadoop/core/trunk/src/test/org/apache/hadoop/net/TestDNS.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/util/TestServiceLifecycle.java
      - copied unchanged from r752949, hadoop/core/trunk/src/test/org/apache/hadoop/util/TestServiceLifecycle.java
Modified:
    hadoop/core/branches/HADOOP-3628/   (props changed)
    hadoop/core/branches/HADOOP-3628/CHANGES.txt   (props changed)
    hadoop/core/branches/HADOOP-3628/ivy/libraries.properties
    hadoop/core/branches/HADOOP-3628/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
    hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/conf/Configuration.java
    hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/http/HttpServer.java
    hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/RPC.java
    hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/DNS.java
    hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/NetUtils.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
    hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
    hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
    hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/fs/TestFileSystem.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
    hadoop/core/branches/HADOOP-3628/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Propchange: hadoop/core/branches/HADOOP-3628/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 19 12:15:51 2009
@@ -1 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
+/hadoop/core/trunk:752949

Propchange: hadoop/core/branches/HADOOP-3628/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 19 12:15:51 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
+/hadoop/core/trunk/CHANGES.txt:752949

Modified: hadoop/core/branches/HADOOP-3628/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/ivy/libraries.properties?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/ivy/libraries.properties (original)
+++ hadoop/core/branches/HADOOP-3628/ivy/libraries.properties Thu Mar 19 12:15:51 2009
@@ -35,7 +35,7 @@
 hsqldb.version=1.8.0.10
 
 #ivy.version=2.0.0-beta2
-ivy.version=2.0.0-rc2
+ivy.version=2.0.0
 
 jasper.version=5.5.12
 #not able to figureout the version of jsp & jsp-api version to get it resolved throught ivy

Modified: hadoop/core/branches/HADOOP-3628/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Thu Mar 19 12:15:51 2009
@@ -71,7 +71,7 @@
    * it to succeed. Then program is launched with insufficient memory and 
    * is expected to be a failure.  
    */
-  public void testCommandLine() {
+  public void testCommandLine() throws Exception {
     if (StreamUtil.isCygwin()) {
       return;
     }
@@ -88,11 +88,9 @@
       fs.delete(outputPath, true);
       assertFalse("output not cleaned up", fs.exists(outputPath));
       mr.waitUntilIdle();
-    } catch(IOException e) {
-      fail(e.toString());
     } finally {
-      mr.shutdown();
-      dfs.shutdown();
+      MiniDFSCluster.close(dfs);
+      MiniMRCluster.close(mr);
     }
   }
 

Modified: hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/conf/Configuration.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/conf/Configuration.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/conf/Configuration.java Thu Mar 19 12:15:51 2009
@@ -185,6 +185,9 @@
   private Properties properties;
   private Properties overlay;
   private ClassLoader classLoader;
+  private static final String ERROR_PARSING_CONF_FILE = 
+          "Error parsing configuration resource ";
+
   {
     classLoader = Thread.currentThread().getContextClassLoader();
     if (classLoader == null) {
@@ -973,7 +976,7 @@
     }
   }
 
-  private synchronized Properties getProps() {
+  protected synchronized Properties getProps() {
     if (properties == null) {
       properties = new Properties();
       loadResources(properties, resources, quietmode);
@@ -1157,16 +1160,16 @@
       }
         
     } catch (IOException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     } catch (DOMException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     } catch (SAXException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     } catch (ParserConfigurationException e) {
-      LOG.fatal("error parsing conf file: " + e);
+      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
       throw new RuntimeException(e);
     }
   }

Modified: hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/http/HttpServer.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/http/HttpServer.java Thu Mar 19 12:15:51 2009
@@ -22,6 +22,7 @@
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.net.BindException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -457,7 +458,11 @@
           // then try the next port number.
           if (ex instanceof BindException) {
             if (!findPort) {
-              throw (BindException) ex;
+              BindException be = new BindException(
+                      "Port in use: " + listener.getHost()
+                              + ":" + listener.getPort());
+              be.initCause(ex);
+              throw be;
             }
           } else {
             LOG.info("HttpServer.start() threw a non Bind IOException"); 
@@ -489,6 +494,14 @@
   }
 
   /**
+   * Test for the availability of the web server
+   * @return true if the web server is started, false otherwise
+   */
+  public boolean isAlive() {
+    return webServer.isStarted();
+  }
+
+  /**
    * A very simple servlet to serve up a text representation of the current
    * stack traces. It both returns the stacks to the caller and logs them.
    * Currently the stack traces are done sequentially rather than exactly the

Modified: hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/Client.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/Client.java Thu Mar 19 12:15:51 2009
@@ -284,8 +284,9 @@
     /** Connect to the server and set up the I/O streams. It then sends
      * a header to the server and starts
      * the connection thread that waits for responses.
+     * @throws IOException if the connection attempt was unsuccessful
      */
-    private synchronized void setupIOstreams() {
+    private synchronized void setupIOstreams() throws IOException {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       }
@@ -308,7 +309,7 @@
             /* The max number of retries is 45,
              * which amounts to 20s*45 = 15 minutes retries.
              */
-            handleConnectionFailure(timeoutFailures++, 45, toe);
+            handleConnectionFailure(timeoutFailures++, maxRetries, toe);
           } catch (IOException ie) {
             handleConnectionFailure(ioFailures++, maxRetries, ie);
           }
@@ -327,6 +328,7 @@
       } catch (IOException e) {
         markClosed(e);
         close();
+        throw e;
       }
     }
 
@@ -358,7 +360,7 @@
 
       // throw the exception if the maximum number of retries is reached
       if (curRetries >= maxRetries) {
-        throw ioe;
+        throw wrapException(remoteId.getAddress(), ioe);
       }
 
       // otherwise back off and retry
@@ -367,7 +369,7 @@
       } catch (InterruptedException ignored) {}
       
       LOG.info("Retrying connect to server: " + server + 
-          ". Already tried " + curRetries + " time(s).");
+          ". Already tried " + curRetries + " time(s) out of "+ maxRetries);
     }
 
     /* Write the header for each connection

Modified: hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/RPC.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/ipc/RPC.java Thu Mar 19 12:15:51 2009
@@ -301,7 +301,7 @@
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
-  static VersionedProtocol waitForProxy(Class protocol,
+  public static VersionedProtocol waitForProxy(Class protocol,
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                Configuration conf,

Modified: hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/DNS.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/DNS.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/DNS.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/DNS.java Thu Mar 19 12:15:51 2009
@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.net;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.util.Enumeration;
 import java.util.Vector;
 
@@ -38,6 +43,29 @@
  * 
  */
 public class DNS {
+  private static final String LOCALHOST = "localhost";
+  private static final Log LOG = LogFactory.getLog(DNS.class);
+
+  /**
+   * Returns the hostname associated with the specified IP address by the
+   * provided nameserver.
+   *
+   * @param address The address to reverse lookup
+   * @param nameserver The host name of a reachable DNS server; can be null
+   * @return The host name associated with the provided IP
+   * @throws NamingException If a NamingException is encountered
+   * @throws IllegalArgumentException if the protocol is unsupported
+   */
+  public static String reverseDns(InetAddress address, String nameserver)
+          throws NamingException {
+    if (address instanceof Inet4Address) {
+      return reverseDns((Inet4Address) address, nameserver);
+    } else if (address instanceof Inet6Address) {
+      return reverseDns((Inet6Address) address, nameserver);
+    } else {
+      throw new IllegalArgumentException("Unsupported Protocol " + address);
+    }
+  }
 
   /**
    * Returns the hostname associated with the specified IP address by the
@@ -45,31 +73,76 @@
    * 
    * @param hostIp
    *            The address to reverse lookup
-   * @param ns
-   *            The host name of a reachable DNS server
+   * @param nameserver
+   *            The host name of a reachable DNS server; can be null
    * @return The host name associated with the provided IP
    * @throws NamingException
    *             If a NamingException is encountered
    */
-  public static String reverseDns(InetAddress hostIp, String ns)
+  private static String reverseDns(Inet4Address hostIp, String nameserver)
     throws NamingException {
     //
     // Builds the reverse IP lookup form
     // This is formed by reversing the IP numbers and appending in-addr.arpa
     //
-    String[] parts = hostIp.getHostAddress().split("\\.");
-    String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
+    byte[] address32 = hostIp.getAddress();
+    String reverseIP;
+    reverseIP =    ""+(address32[3]&0xff) + '.'
+                    + (address32[2] & 0xff) + '.'
+                    + (address32[1] & 0xff) + '.'
+                    + (address32[0] & 0xff) + '.'
+                    + "in-addr.arpa";
+/*
+    String hostAddress = hostIp.getHostAddress();
+    String[] parts = hostAddress.split("\\.");
+    if (parts.length < 4) {
+      throw new IllegalArgumentException("Unable to determine IPv4 address of "
+              + hostAddress);
+    }
+    reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
       + parts[0] + ".in-addr.arpa";
+*/
 
+    return retrievePTRRecord(nameserver, reverseIP);
+  }
+
+  /**
+   * Retrieve the PTR record from a DNS entry; if given the right
+   * reverse IP this will resolve both IPv4 and IPv6 addresses
+   * @param nameserver name server to use; can be null
+   * @param reverseIP reverse IP address to look up
+   * @return the PTR record of the host
+   * @throws NamingException if the record can not be found.
+   */
+  private static String retrievePTRRecord(String nameserver, String reverseIP)
+          throws NamingException {
     DirContext ictx = new InitialDirContext();
-    Attributes attribute =
-      ictx.getAttributes("dns://"               // Use "dns:///" if the default
-                         + ((ns == null) ? "" : ns) + 
-                         // nameserver is to be used
+    try {
+      // Use "dns:///" if the default nameserver is to be used
+      Attributes attribute = ictx.getAttributes("dns://"
+                         + ((nameserver == null) ? "" : nameserver) +
                          "/" + reverseIP, new String[] { "PTR" });
-    ictx.close();
-    
-    return attribute.get("PTR").get().toString();
+      return attribute.get("PTR").get().toString();
+    } finally {
+      ictx.close();
+    }
+  }
+
+  /**
+   * Returns the hostname associated with the specified IP address by the
+   * provided nameserver.
+   *
+   * @param address
+   *            The address to reverse lookup
+   * @param nameserver
+   *            The host name of a reachable DNS server; can be null
+   * @return The host name associated with the provided IP
+   * @throws NamingException
+   *             If a NamingException is encountered
+   */
+  private static String reverseDns(Inet6Address address, String nameserver)
+          throws NamingException {
+    throw new IllegalArgumentException("Reverse DNS lookup of IPv6 is currently unsupported: " + address);
   }
 
   /**
@@ -90,21 +163,23 @@
     try {
       NetworkInterface netIF = NetworkInterface.getByName(strInterface);
       if (netIF == null)
-        return new String[] { InetAddress.getLocalHost()
-                              .getHostAddress() };
+        return new String[] {getLocalHostIPAddress()};
       else {
         Vector<String> ips = new Vector<String>();
-        Enumeration e = netIF.getInetAddresses();
-        while (e.hasMoreElements())
-          ips.add(((InetAddress) e.nextElement()).getHostAddress());
-        return ips.toArray(new String[] {});
+        Enumeration<InetAddress> e = netIF.getInetAddresses();
+
+        while (e.hasMoreElements()) {
+          ips.add((e.nextElement()).getHostAddress());
+        }
+        return ips.toArray(new String[ips.size()]);
       }
     } catch (SocketException e) {
-      return new String[] { InetAddress.getLocalHost().getHostAddress() };
+      return new String[] {getLocalHostIPAddress()};
     }
   }
 
-  /**
+
+    /**
    * Returns the first available IP address associated with the provided
    * network interface
    * 
@@ -130,26 +205,98 @@
    *            The DNS host name
    * @return A string vector of all host names associated with the IPs tied to
    *         the specified interface
-   * @throws UnknownHostException
+   * @throws UnknownHostException if the hostname cannot be determined
    */
   public static String[] getHosts(String strInterface, String nameserver)
     throws UnknownHostException {
     String[] ips = getIPs(strInterface);
     Vector<String> hosts = new Vector<String>();
-    for (int ctr = 0; ctr < ips.length; ctr++)
+    for (String ip : ips) {
+      InetAddress ipAddr = null;
       try {
-        hosts.add(reverseDns(InetAddress.getByName(ips[ctr]),
-                             nameserver));
-      } catch (Exception e) {
+        ipAddr = InetAddress.getByName(ip);
+        hosts.add(reverseDns(ipAddr, nameserver));
+      } catch (UnknownHostException e) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Unable to resolve hostname of " + ip + ": " + e, e);
+      } catch (NamingException e) {
+        if(LOG.isDebugEnabled())
+          LOG.debug("Unable to reverse DNS of host" + ip
+                +  (ipAddr != null ? (" and address "+ipAddr): "")
+                + " : " + e, e);
+      } catch (IllegalArgumentException e) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("Unable to resolve IP address of " + ip
+                  + (ipAddr != null ? (" and address " + ipAddr) : "")
+                  + " : " + e, e);
       }
+    }
 
-    if (hosts.size() == 0)
-      return new String[] { InetAddress.getLocalHost().getCanonicalHostName() };
-    else
-      return hosts.toArray(new String[] {});
+    if (hosts.isEmpty()) {
+      return new String[] { getLocalHostname() };
+    } else {
+      return hosts.toArray(new String[hosts.size()]);
+    }
   }
 
   /**
+   * The cached hostname -initially null.
+   */
+
+  private static volatile String cachedHostname;
+
+  /**
+   * The cached address hostname -initially null.
+   */
+  private static volatile String cachedHostAddress;
+
+    /**
+     * Determine the local hostname; retrieving it from cache if it is known
+     * If we cannot determine our host name, return "localhost"
+     * This code is not synchronized; if more than one thread calls it while
+     * it is determining the address, the last one to exit the loop wins.
+     * However, as both threads are determining and caching the same value, it
+     * should be moot.
+     * @return the local hostname or "localhost"
+     */
+  private static String getLocalHostname() {
+    if(cachedHostname == null) {
+      try {
+        cachedHostname = InetAddress.getLocalHost().getCanonicalHostName();
+      } catch (UnknownHostException e) {
+        LOG.info("Unable to determine local hostname "
+                + "-falling back to \""+LOCALHOST+"\"", e);
+        cachedHostname = LOCALHOST;
+      }
+    }
+    return cachedHostname;
+  }
+
+  /**
+   * Get the IPAddress of the local host as a string. This may be a loop back
+   * value.
+   * This code is not synchronized; if more than one thread calls it while
+   * it is determining the address, the last one to exit the loop wins.
+   * However, as both threads are determining and caching the same value, it
+   * should be moot.
+   * @return the IPAddress of the localhost
+   * @throws UnknownHostException if not even "localhost" resolves.
+   */
+  private static String getLocalHostIPAddress() throws UnknownHostException {
+    if (cachedHostAddress == null) {
+      try {
+        InetAddress localHost = InetAddress.getLocalHost();
+        cachedHostAddress = localHost.getHostAddress();
+      } catch (UnknownHostException e) {
+        LOG.info("Unable to determine local IP Address "
+                + "-falling back to loopback address", e);
+        cachedHostAddress = InetAddress.getByName(LOCALHOST).getHostAddress();
+      }
+    }
+    return cachedHostAddress;
+  }
+
+    /**
    * Returns all the host names associated by the default nameserver with the
    * address bound to the specified network interface
    * 
@@ -158,7 +305,7 @@
    * @return The list of host names associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the deault interface
+   *             If one is encountered while querying the default interface
    * 
    */
   public static String[] getHosts(String strInterface)
@@ -177,15 +324,17 @@
    * @return The default host names associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the deault interface
+   *             If one is encountered while querying the default interface
    */
   public static String getDefaultHost(String strInterface, String nameserver)
     throws UnknownHostException {
-    if (strInterface.equals("default")) 
-      return InetAddress.getLocalHost().getCanonicalHostName();
+    if ("default".equals(strInterface)) {
+      return getLocalHostname();
+    }
 
-    if (nameserver != null && nameserver.equals("default"))
+    if ("default".equals(nameserver)) {
       return getDefaultHost(strInterface);
+    }
 
     String[] hosts = getHosts(strInterface, nameserver);
     return hosts[0];
@@ -196,11 +345,12 @@
    * nameserver with the address bound to the specified network interface
    * 
    * @param strInterface
-   *            The name of the network interface to query (e.g. eth0)
+   *            The name of the network interface to query (e.g. eth0).
+   *            Must not be null.
    * @return The default host name associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the deault interface
+   *             If one is encountered while querying the default interface
    */
   public static String getDefaultHost(String strInterface)
     throws UnknownHostException {

Modified: hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/NetUtils.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/core/org/apache/hadoop/net/NetUtils.java Thu Mar 19 12:15:51 2009
@@ -26,6 +26,7 @@
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.net.ConnectException;
 import java.nio.channels.SocketChannel;
 import java.util.Map.Entry;
 import java.util.regex.Pattern;
@@ -180,9 +181,12 @@
     String oldAddr = conf.get(oldBindAddressName);
     String oldPort = conf.get(oldPortName);
     String newAddrPort = conf.get(newBindAddressName);
-    if (oldAddr == null && oldPort == null) {
+    if (oldAddr == null && oldPort == null && newAddrPort != null) {
       return newAddrPort;
     }
+    if (newAddrPort == null) {
+      throw new IllegalArgumentException("No value for " + newBindAddressName);
+    }
     String[] newAddrPortParts = newAddrPort.split(":",2);
     if (newAddrPortParts.length != 2) {
       throw new IllegalArgumentException("Invalid address/port: " + 
@@ -395,14 +399,18 @@
     if (socket == null || endpoint == null || timeout < 0) {
       throw new IllegalArgumentException("Illegal argument for connect()");
     }
-    
-    SocketChannel ch = socket.getChannel();
-    
-    if (ch == null) {
-      // let the default implementation handle it.
-      socket.connect(endpoint, timeout);
-    } else {
-      SocketIOWithTimeout.connect(ch, endpoint, timeout);
+    try {
+      SocketChannel ch = socket.getChannel();
+
+      if (ch == null) {
+        // let the default implementation handle it.
+        socket.connect(endpoint, timeout);
+      } else {
+        SocketIOWithTimeout.connect(ch, endpoint, timeout);
+      }
+    } catch (ConnectException e) {
+      throw (ConnectException) new ConnectException(
+              e + " connecting to " + endpoint).initCause(e);
     }
   }
   

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Mar 19 12:15:51 2009
@@ -1384,7 +1384,7 @@
     synchronized void openInfo() throws IOException {
       LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
       if (newInfo == null) {
-        throw new IOException("Cannot open filename " + src);
+        throw new FileNotFoundException("Cannot open HDFS file " + src);
       }
 
       if (locatedBlocks != null) {
@@ -3084,7 +3084,9 @@
      * resources associated with this stream.
      */
     private synchronized void closeInternal() throws IOException {
-      checkOpen();
+      if ( !clientRunning ) {
+          return;
+      }
       isClosed();
 
       try {

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java Thu Mar 19 12:15:51 2009
@@ -24,6 +24,7 @@
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
@@ -549,8 +550,10 @@
 
     /**
      * Unlock storage.
-     * 
-     * @throws IOException
+     * Does nothing if there is no acquired lock
+     * @throws IOException for IO problems
+     * @throws ClosedChannelException if the channel owning the lock is
+     *  already closed.
      */
     public void unlock() throws IOException {
       if (this.lock == null)
@@ -703,11 +706,22 @@
 
   /**
    * Unlock all storage directories.
-   * @throws IOException
+   * @throws IOException on a failure to unlock
    */
   public void unlockAll() throws IOException {
-    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
-      it.next().unlock();
+    IOException ioe = null;
+    for (StorageDirectory storageDir : storageDirs) {
+      try {
+        storageDir.unlock();
+      } catch (IOException e) {
+        LOG.warn("Failed to unlock " + storageDir.getRoot() + " : " + e, e);
+        if (ioe != null) {
+          ioe = e;
+        }
+      }
+    }
+    if (ioe != null) {
+      throw ioe;
     }
   }
 

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Mar 19 12:15:51 2009
@@ -43,7 +43,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -91,6 +90,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.Service;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -123,7 +123,7 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode extends Configured 
+public class DataNode extends Service
     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
   
@@ -186,7 +186,7 @@
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
-  
+  private AbstractList<File> dataDirs;
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
   
@@ -206,20 +206,57 @@
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
+   * This constructor does not start the node, merely initialize it
+   * @param conf configuration to use
+   * @param dataDirs list of directories that may be used for data
+   * @throws IOException for historical reasons
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
     super(conf);
     datanodeObject = this;
+    this.dataDirs = dataDirs;
+  }
 
-    try {
-      startDataNode(conf, dataDirs);
-    } catch (IOException ie) {
-      shutdown();
-      throw ie;
+  /////////////////////////////////////////////////////
+  // Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * Start any work (in separate threads)
+   *
+   * @throws IOException for any startup failure
+   */
+  @Override
+  public void innerStart() throws IOException {
+    startDataNode(getConf(), dataDirs);
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * This implementation checks for the name system being non-null and live
+   *
+   * @throws IOException for any ping failure
+   * @throws LivenessException if the IPC server is not defined @param status the initial status
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (ipcServer == null) {
+      status.addThrowable(new LivenessException("No IPC Server running"));
+    }
+    if (dnRegistration == null) {
+      status.addThrowable(new LivenessException("Not registered to a namenode"));
     }
   }
     
+  /**
+   * Shut down this instance of the datanode. Returns only after shutdown is
+   * complete.
+   */
+  public void shutdown() {
+    closeQuietly();
+  }
   
   /**
    * This method starts the data node with the specified conf.
@@ -244,7 +281,7 @@
                                      conf.get("dfs.datanode.dns.nameserver","default"));
     }
     InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
-    
+    DataNode.nameNodeAddr = nameNodeAddr;
     this.socketTimeout =  conf.getInt("dfs.socket.timeout",
                                       HdfsConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -330,7 +367,6 @@
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
-    DataNode.nameNodeAddr = nameNodeAddr;
 
     //initialize periodic block scanner
     String reason = null;
@@ -357,6 +393,9 @@
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
         tmpInfoPort == 0, conf);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
+    }
     if (conf.getBoolean("dfs.https.enable", false)) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
@@ -364,6 +403,9 @@
       Configuration sslConf = new Configuration(false);
       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
           "ssl-server.xml"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
+      }
       this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
     }
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
@@ -420,6 +462,10 @@
         } catch (InterruptedException ie) {}
       }
     }
+    if(!shouldRun) {
+      throw new IOException("Datanode shut down during handshake with NameNode "
+               + getNameNodeAddr());
+    }
     String errorMsg = null;
     // verify build version
     if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
@@ -520,10 +566,14 @@
    * @see FSNamesystem#registerDatanode(DatanodeRegistration)
    * @throws IOException
    */
-  private void register() throws IOException {
+  protected void register() throws IOException {
     if (dnRegistration.getStorageID().equals("")) {
       setNewStorageID(dnRegistration);
     }
+    //if we are LIVE, move into the STARTED state, as registration implies that
+    //the node is no longer LIVE
+    enterState(ServiceState.LIVE, ServiceState.STARTED);
+    //spin until the server is up.
     while(shouldRun) {
       try {
         // reset name to machineName. Mainly for web interface.
@@ -552,7 +602,8 @@
           + dnRegistration.getStorageID() 
           + ". Expecting " + storage.getStorageID());
     }
-    
+    //at this point the DataNode is now live.
+    enterLiveState();
     // random short delay - helps scatter the BR from all DNs
     scheduleBlockReport(initialBlockReportDelay);
   }
@@ -563,18 +614,25 @@
    * This method can only be called by the offerService thread.
    * Otherwise, deadlock might occur.
    */
-  public void shutdown() {
-    if (infoServer != null) {
-      try {
-        infoServer.stop();
-      } catch (Exception e) {
-        LOG.warn("Exception shutting down DataNode", e);
+  @Override
+  protected void innerClose() throws IOException {
+    synchronized (this) {
+      //disable the should run flag first, so that everything out there starts
+      //to shut down
+      shouldRun = false;
+      //shut down the infoserver
+      if (infoServer != null) {
+        try {
+          infoServer.stop();
+        } catch (Exception e) {
+          LOG.debug("Ignoring exception when shutting down the infoserver", e);
+        }
+      }
+      //shut down the IPC server
+      if (ipcServer != null) {
+        ipcServer.stop();
       }
     }
-    if (ipcServer != null) {
-      ipcServer.stop();
-    }
-    this.shouldRun = false;
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
@@ -602,9 +660,10 @@
     
     RPC.stopProxy(namenode); // stop the RPC threads
     
-    if(upgradeManager != null)
+    if(upgradeManager != null) {
       upgradeManager.shutdownUpgrade();
-    if (blockScannerThread != null) { 
+    }
+    if (blockScannerThread != null) {
       blockScannerThread.interrupt();
       try {
         blockScannerThread.join(3600000L); // wait for at most 1 hour
@@ -613,8 +672,10 @@
     }
     if (storage != null) {
       try {
-        this.storage.unlockAll();
+        storage.unlockAll();
       } catch (IOException ie) {
+        LOG.warn("Ignoring exception when unlocking storage: "+ie,
+                ie);
       }
     }
     if (dataNodeThread != null) {
@@ -805,14 +866,13 @@
         if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
             DisallowedDatanodeException.class.getName().equals(reClass) ||
             IncorrectVersionException.class.getName().equals(reClass)) {
-          LOG.warn("DataNode is shutting down: " + 
-                   StringUtils.stringifyException(re));
+          LOG.warn("DataNode is shutting down: " + re, re);
           shutdown();
           return;
         }
-        LOG.warn(StringUtils.stringifyException(re));
+        LOG.warn(re, re);
       } catch (IOException e) {
-        LOG.warn(StringUtils.stringifyException(e));
+        LOG.warn(e, e);
       }
     } // while (shouldRun)
   } // offerService
@@ -1185,7 +1245,9 @@
         startDistributedUpgradeIfNeeded();
         offerService();
       } catch (Exception ex) {
-        LOG.error("Exception: " + StringUtils.stringifyException(ex));
+        LOG.error("Exception while in state " + getServiceState()
+                + " and shouldRun=" + shouldRun + ": " + ex,
+                ex);
         if (shouldRun) {
           try {
             Thread.sleep(5000);
@@ -1265,33 +1327,52 @@
    * @param conf Configuration instance to use.
    * @return DataNode instance for given list of data dirs and conf, or null if
    * no directory from this directory list can be created.
-   * @throws IOException
+   * @throws IOException if problems occur when starting the data node
    */
   public static DataNode makeInstance(String[] dataDirs, Configuration conf)
-    throws IOException {
+          throws IOException {
     ArrayList<File> dirs = new ArrayList<File>();
-    for (int i = 0; i < dataDirs.length; i++) {
-      File data = new File(dataDirs[i]);
+    StringBuffer invalid = new StringBuffer();
+    for (String dataDir : dataDirs) {
+      File data = new File(dataDir);
       try {
         DiskChecker.checkDir(data);
         dirs.add(data);
       } catch(DiskErrorException e) {
-        LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
+        LOG.warn("Invalid directory in dfs.data.dir: " + e, e);
+        invalid.append(dataDir);
+        invalid.append(" ");
       }
     }
-    if (dirs.size() > 0) 
-      return new DataNode(conf, dirs);
-    LOG.error("All directories in dfs.data.dir are invalid.");
-    return null;
+    if (dirs.size() > 0) {
+      DataNode dataNode = new DataNode(conf, dirs);
+      Service.deploy(dataNode);
+      return dataNode;
+    } else {
+      LOG.error("All directories in dfs.data.dir are invalid: " + invalid);
+      return null;
+    }
   }
 
+
+  /**
+   * {@inheritDoc}
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "DataNode";
+  }
+  
   @Override
   public String toString() {
-    return "DataNode{" +
+    return "DataNode {" +
       "data=" + data +
-      ", localName='" + dnRegistration.getName() + "'" +
-      ", storageID='" + dnRegistration.getStorageID() + "'" +
+      (dnRegistration != null? (
+        ", localName='" + dnRegistration.getName() + "'" +
+        ", storageID='" + dnRegistration.getStorageID() + "'" ) : "") +
       ", xmitsInProgress=" + xmitsInProgress +
+      ", state="+ getServiceState() +
       "}";
   }
   

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Mar 19 12:15:51 2009
@@ -185,6 +185,9 @@
       }
 
       File blockFiles[] = dir.listFiles();
+      if (blockFiles == null) {
+        throw new IllegalStateException("Not a valid directory: " + dir);
+      }
       for (int i = 0; i < blockFiles.length; i++) {
         if (Block.isBlockFilename(blockFiles[i])) {
           long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Mar 19 12:15:51 2009
@@ -395,9 +395,7 @@
    */
   synchronized void processIOError(int index) {
     if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-      "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
+      processAllStorageInaccessible();
     }
     assert(index < getNumStorageDirs());
     assert(getNumStorageDirs() == editStreams.size());
@@ -417,27 +415,43 @@
     //
     fsimage.processIOError(parentStorageDir);
   }
-  
+
+  /**
+   * report inaccessible storage directories and trigger a fatal error
+   */
+  private void processAllStorageInaccessible() {
+    processFatalError("Fatal Error: All storage directories are inaccessible.");
+  }
+
+  /**
+   * Handle a fatal error
+   * @param message message to include in any output
+   */
+  protected void processFatalError(String message) {
+    FSNamesystem.LOG.fatal(message);
+    Runtime.getRuntime().exit(-1);
+  }
+
   /**
    * If there is an IO Error on any log operations on storage directory,
    * remove any stream associated with that directory 
    */
   synchronized void processIOError(StorageDirectory sd) {
     // Try to remove stream only if one should exist
-    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
       return;
+    }
     if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-          "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
+      processAllStorageInaccessible();
     }
     for (int idx = 0; idx < editStreams.size(); idx++) {
-      File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                       .get(idx)).getFile()
-                                       .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+      File parentStorageDir = ((EditLogFileOutputStream) editStreams
+              .get(idx)).getFile()
+              .getParentFile().getParentFile();
+      if (parentStorageDir.getName().equals(sd.getRoot().getName())) {
         editStreams.remove(idx);
- }
+      }
+    }
   }
   
   /**
@@ -458,10 +472,8 @@
         }
       }
       if (j == numEditStreams) {
-          FSNamesystem.LOG.error("Unable to find sync log on which " +
-                                 " IO error occured. " +
-                                 "Fatal Error.");
-          Runtime.getRuntime().exit(-1);
+        processFatalError("Fatal Error: Unable to find sync log on which " +
+                                 " IO error occured. ");
       }
       processIOError(j);
     }

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Mar 19 12:15:51 2009
@@ -449,6 +449,59 @@
   }
 
   /**
+   * Test for a thread ref not being null or pointing to a dead thread
+   * @param thread the thread to check
+   * @return true if the thread is considered dead
+   */
+  private boolean isDead(Thread thread) {
+      return thread == null || !thread.isAlive();
+  }
+
+  /**
+   * Perform a cursory health check of the namesystem, particulary that it has
+   * not been closed and that all threads are running.
+   * @throws IOException for any health check
+   */
+  void ping() throws IOException {
+    if (!fsRunning) {
+      throw new IOException("Namesystem is not running");
+    }
+    boolean bad = false;
+    StringBuilder sb = new StringBuilder();
+    if (isDead(hbthread)) {
+      bad = true;
+      sb.append("[Heartbeat thread is dead]");
+    }
+    if (isDead(replthread)) {
+      bad = true;
+      sb.append("[Replication thread is dead]");
+    }
+    // this thread's liveness is only relevant in safe mode.
+    if (safeMode!=null && isDead(smmthread)) {
+      bad = true;
+      sb.append("[SafeModeMonitor thread is dead while the name system is in safe mode]");
+    }
+    if (isDead(dnthread)) {
+        bad = true;
+        sb.append("[DecommissionedMonitor thread is dead]");
+    }
+    if (isDead(lmthread)) {
+      bad = true;
+      sb.append("[Lease monitor thread is dead]");
+    }
+    if (pendingReplications == null || !pendingReplications.isAlive()) {
+      bad = true;
+      sb.append("[Pending replication thread is dead]");
+    }
+    if (this != getFSNamesystem()) {
+      bad = true;
+      sb.append("[FSNamesystem not a singleton]");
+    }
+    if (bad) {
+      throw new IOException(sb.toString());
+    }
+  }
+  /**
    * Close down this file system manager.
    * Causes heartbeat and lease daemons to stop; waits briefly for
    * them to finish, but a short timeout returns control back to caller.
@@ -470,7 +523,10 @@
           lmthread.interrupt();
           lmthread.join(3000);
         }
-        dir.close();
+        if(dir != null) {
+         dir.close();
+         dir =  null;
+        }
       } catch (InterruptedException ie) {
       } catch (IOException ie) {
         LOG.error("Error closing FSDirectory", ie);
@@ -1252,9 +1308,13 @@
                                                            null,
                                                            blockSize);
     if (targets.length < this.minReplication) {
-      throw new IOException("File " + src + " could only be replicated to " +
-                            targets.length + " nodes, instead of " +
-                            minReplication);
+        String message = "File " + src + " could only be replicated to " +
+                targets.length + " nodes, instead of "
+                + minReplication
+                + ". ( there are " + heartbeats.size()
+                + " live data nodes in the cluster)";
+
+        throw new IOException(message);
     }
 
     // Allocate a new block and record it in the INode. 

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Mar 19 12:15:51 2009
@@ -45,6 +45,7 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Service;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
@@ -84,7 +85,7 @@
  * NameNode implements the ClientProtocol interface, which allows
  * clients to ask for DFS services.  ClientProtocol is not
  * designed for direct use by authors of DFS client code.  End-users
- * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
+ * should instead use the {@link FileSystem} class.
  *
  * NameNode also implements the DatanodeProtocol interface, used by
  * DataNode programs that actually store DFS data blocks.  These
@@ -95,7 +96,7 @@
  * secondary namenodes or rebalancing processes to get partial namenode's
  * state, for example partial blocksMap etc.
  **********************************************************/
-public class NameNode implements ClientProtocol, DatanodeProtocol,
+public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
                                  NamenodeProtocol, FSConstants,
                                  RefreshAuthorizationPolicyProtocol {
   static{
@@ -133,8 +134,6 @@
   /** HTTP server address */
   private InetSocketAddress httpAddress = null;
   private Thread emptier;
-  /** only used for testing purposes  */
-  private boolean stopRequested = false;
   /** Is service level authorization enabled? */
   private boolean serviceAuthEnabled = false;
   
@@ -162,7 +161,17 @@
   }
 
   public static InetSocketAddress getAddress(Configuration conf) {
-    return getAddress(FileSystem.getDefaultUri(conf).getAuthority());
+    URI fsURI = FileSystem.getDefaultUri(conf);
+    if (fsURI == null) {
+      throw new IllegalArgumentException(
+              "No default filesystem URI in the configuration");
+    }
+    String auth = fsURI.getAuthority();
+    if (auth == null) {
+      throw new IllegalArgumentException(
+              "No authority for the Filesystem URI " + fsURI);
+    }
+    return getAddress(auth);
   }
 
   public static URI getUri(InetSocketAddress namenode) {
@@ -175,6 +184,7 @@
    * Initialize name-node.
    * 
    * @param conf the configuration
+   * @throws IOException for problems during initialization
    */
   private void initialize(Configuration conf) throws IOException {
     InetSocketAddress socAddr = NameNode.getAddress(conf);
@@ -261,7 +271,7 @@
   }
 
   /**
-   * Start NameNode.
+   * Create a NameNode.
    * <p>
    * The name-node can be started with one of the following startup options:
    * <ul> 
@@ -280,14 +290,49 @@
    * <code>zero</code> in the conf.
    * 
    * @param conf  confirguration
-   * @throws IOException
+   * @throws IOException for backwards compatibility
    */
   public NameNode(Configuration conf) throws IOException {
-    try {
-      initialize(conf);
-    } catch (IOException e) {
-      this.stop();
-      throw e;
+    super(conf);
+  }
+
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * This method does all the startup.
+   * It is invoked from {@link #start()} when needed.
+   *
+   * @throws IOException for any problem.
+   */
+  @Override
+  protected void innerStart() throws IOException {
+    initialize(getConf());
+    setServiceState(ServiceState.LIVE);
+  }
+
+    /**
+   * {@inheritDoc}.
+   *
+   * This implementation checks for the name system being non-null and live
+   * @throws IOException for any ping failure
+   * @throws LivenessException if the name system is not running @param status
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (namesystem == null) {
+      status.addThrowable(new LivenessException("No name system"));
+    } else {
+      try {
+        namesystem.ping();
+      } catch (IOException e) {
+        status.addThrowable(e);
+      }
+    }
+    if (httpServer == null || !httpServer.isAlive()) {
+      status.addThrowable(
+              new IOException("NameNode HttpServer is not running"));
     }
   }
 
@@ -297,34 +342,81 @@
    */
   public void join() {
     try {
-      this.server.join();
+      if (server != null) {
+        server.join();
+      }
     } catch (InterruptedException ie) {
     }
   }
 
   /**
-   * Stop all NameNode threads and wait for all to finish.
+   * {@inheritDoc}
+   * To shut down, this service stops all NameNode threads and waits for them
+   * to finish. It also stops the metrics.
    */
-  public void stop() {
-    if (stopRequested)
-      return;
-    stopRequested = true;
+  @Override
+  public synchronized void innerClose() throws IOException {
+    LOG.info("Closing NameNode");
     try {
-      if (httpServer != null) httpServer.stop();
+      if (httpServer != null) {
+        httpServer.stop();
+      }
     } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error(StringUtils.stringifyException(e),e);
+    }
+    httpServer = null;
+    if (namesystem != null) {
+      namesystem.close();
+    }
+    if (emptier != null) {
+      emptier.interrupt();
+      emptier = null;
+    }
+    if (server != null) {
+      server.stop();
+      server = null;
     }
-    if(namesystem != null) namesystem.close();
-    if(emptier != null) emptier.interrupt();
-    if(server != null) server.stop();
     if (myMetrics != null) {
       myMetrics.shutdown();
     }
     if (namesystem != null) {
       namesystem.shutdown();
+      namesystem = null;
     }
   }
   
+  /**
+   * Retained for backwards compatibility.
+   */
+  public void stop() {
+    closeQuietly();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "NameNode";
+  }
+
+  /**
+   * The toString operator returns the super class name/id, and the state. This
+   * gives all services a slightly useful message in a debugger or test report
+   *
+   * @return a string representation of the object.
+   */
+  @Override
+  public String toString() {
+    return getServiceName() + " instance " + super.toString() + " in state "
+            + getServiceState()
+            + (httpAddress != null ? (" at " + httpAddress + " , "): "")
+            + (server != null ? (", IPC " + server.getListenerAddress()) : "");
+  }
+
+
   /////////////////////////////////////////////////////
   // NamenodeProtocol
   /////////////////////////////////////////////////////
@@ -969,6 +1061,7 @@
     }
 
     NameNode namenode = new NameNode(conf);
+    deploy(namenode);
     return namenode;
   }
     

Modified: hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java Thu Mar 19 12:15:51 2009
@@ -136,6 +136,15 @@
   }
 
   /**
+   * Test for the replicator being alive.
+   * @return true if the thread is running.
+   */
+  boolean isAlive() {
+    Daemon daemon = timerThread;
+    return daemon != null && daemon.isAlive();
+  }
+
+  /**
    * An object that contains information about a block that 
    * is being replicated. It records the timestamp when the 
    * system started replicating the most recent copy of this

Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Mar 19 12:15:51 2009
@@ -1243,11 +1243,26 @@
   /** 
    * Utility that submits a job, then polls for progress until the job is
    * complete.
-   * 
+   *
    * @param job the job configuration.
-   * @throws IOException
+   * @return the job reference of the completed, successful, job
+   * @throws IOException any IO problem, and job failure
    */
   public static RunningJob runJob(JobConf job) throws IOException {
+    return runJob(job, -1);
+  }
+
+  /** 
+   * Utility that submits a job, then polls for progress until the job is
+   * complete.
+   * 
+   * @param job the job configuration.
+   * @param timeout timeout in milliseconds; any value less than or equal to
+   * zero means "do not time out"
+   * @return the job reference of the completed, successful, job
+   * @throws IOException any IO problem, and job failure
+   */
+  public static RunningJob runJob(JobConf job, long timeout) throws IOException {
     JobClient jc = new JobClient(job);
     boolean error = true;
     RunningJob running = null;
@@ -1255,6 +1270,7 @@
     final int MAX_RETRIES = 5;
     int retries = MAX_RETRIES;
     TaskStatusFilter filter;
+    long endTime = timeout > 0 ? System.currentTimeMillis() + timeout : 0;
     try {
       filter = getTaskOutputFilter(job);
     } catch(IllegalArgumentException e) {
@@ -1351,6 +1367,10 @@
           LOG.info("Communication problem with server: " +
                    StringUtils.stringifyException(ie));
         }
+        //check for timeout
+        if (endTime > 0 && endTime > System.currentTimeMillis()) {
+          throw new IOException("Job execution timed out");
+        }
       }
       if (!running.isSuccessful()) {
         throw new IOException("Job failed!");

Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Thu Mar 19 12:15:51 2009
@@ -89,7 +89,11 @@
 
   public static void stopNotifier() {
     running = false;
-    thread.interrupt();
+    //copy into a variable to deal with race conditions
+    Thread notifier = thread;
+    if (notifier != null) {
+      notifier.interrupt();
+    }
   }
 
   private static JobEndStatusInfo createNotification(JobConf conf,

Modified: hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=755965&r1=755964&r2=755965&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Mar 19 12:15:51 2009
@@ -77,13 +77,14 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.Service;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * tracking MR jobs in a network environment.
  *
  *******************************************************/
-public class JobTracker implements MRConstants, InterTrackerProtocol,
+public class JobTracker extends Service implements MRConstants, InterTrackerProtocol,
     JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
 
   static{
@@ -110,7 +111,11 @@
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
-
+  /**
+   * Time in milliseconds to sleep while trying to start the job tracker:
+   * {@value}
+   */
+  private static final int STARTUP_SLEEP_INTERVAL = 1000;
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -166,7 +171,7 @@
     while (true) {
       try {
         result = new JobTracker(conf);
-        result.taskScheduler.setTaskTrackerManager(result);
+        deploy(result);
         break;
       } catch (VersionMismatch e) {
         throw e;
@@ -175,19 +180,24 @@
       } catch (UnknownHostException e) {
         throw e;
       } catch (IOException e) {
-        LOG.warn("Error starting tracker: " + 
-                 StringUtils.stringifyException(e));
+        LOG.warn("Error starting tracker: " +
+                e.getMessage(), e);
       }
-      Thread.sleep(1000);
+      Thread.sleep(STARTUP_SLEEP_INTERVAL);
     }
-    if (result != null) {
+    if (result != null && result.isRunning()) {
       JobEndNotifier.startNotifier();
     }
     return result;
   }
 
-  public void stopTracker() throws IOException {
-    JobEndNotifier.stopNotifier();
+  /**
+   * This stops the tracker, the JobEndNotifier and moves the service into the
+   * terminated state.
+   *
+   * @throws IOException for any trouble during closedown
+   */
+  public synchronized void stopTracker() throws IOException {
     close();
   }
     
@@ -1299,7 +1309,7 @@
   // (hostname --> Node (NetworkTopology))
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
-  
+
   // Number of resolved entries
   int numResolved;
     
@@ -1351,7 +1361,7 @@
                                    );
 
   // Used to provide an HTML view on Job, Task, and TaskTracker structures
-  final HttpServer infoServer;
+  HttpServer infoServer;
   int infoPort;
 
   Server interTrackerServer;
@@ -1366,9 +1376,13 @@
   private QueueManager queueManager;
 
   /**
-   * Start the JobTracker process, listen on the indicated port
+   * Create the JobTracker, based on the configuration
+   * @param conf configuration to use
+   * @throws IOException on problems initializing the tracker
    */
   JobTracker(JobConf conf) throws IOException, InterruptedException {
+    super(conf);
+    this.conf = conf;
     //
     // Grab some static constants
     //
@@ -1386,10 +1400,6 @@
     AVERAGE_BLACKLIST_THRESHOLD = 
       conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f); 
 
-    // This is a directory of temporary submission files.  We delete it
-    // on startup, and can delete any files that we're done with
-    this.conf = conf;
-    JobConf jobConf = new JobConf(conf);
 
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
@@ -1402,7 +1412,23 @@
       = conf.getClass("mapred.jobtracker.taskScheduler",
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
+    taskScheduler.setTaskTrackerManager(this);
+  }
                                            
+  /**
+   * This contains the startup logic moved out of the constructor.
+   * It must never be called directly. Instead call {@link Service#start()} and
+   * let service decide whether to invoke this method once and once only.
+   *
+   * Although most of the intialization work has been performed, the
+   * JobTracker does not go live until {@link #offerService()} is called.
+   * accordingly, JobTracker does not enter the Live state here.
+   * @throws IOException for any startup problems
+   */
+  protected void innerStart() throws IOException {
+    // This is a directory of temporary submission files.  We delete it
+    // on startup, and can delete any files that we're done with
+    JobConf jobConf = new JobConf(conf);
     // Set ports, start RPC servers, setup security policy etc.
     InetSocketAddress addr = getAddress(conf);
     this.localMachine = addr.getHostName();
@@ -1458,15 +1484,19 @@
     trackerIdentifier = getDateFormat().format(new Date());
 
     Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
-    try {
-      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
-        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
-      this.myInstrumentation = c.newInstance(this, jobConf);
-    } catch(Exception e) {
-      //Reflection can throw lots of exceptions -- handle them all by 
-      //falling back on the default.
-      LOG.error("failed to initialize job tracker metrics", e);
-      this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+    //this operation is synchronized to stop findbugs warning of inconsistent
+    //access
+    synchronized (this) {
+      try {
+        java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+          metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+        this.myInstrumentation = c.newInstance(this, jobConf);
+      } catch(Exception e) {
+        //Reflection can throw lots of exceptions -- handle them all by 
+        //falling back on the default.
+        LOG.error("failed to initialize job tracker metrics", e);
+        this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+      }
     }
  
     
@@ -1488,6 +1518,9 @@
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
           fs = FileSystem.get(conf);
+          if(fs == null) {
+            throw new IllegalStateException("Unable to bind to the filesystem");
+          }
         }
         // clean up the system dir, which will only work if hdfs is out of 
         // safe mode
@@ -1529,9 +1562,14 @@
                 ((RemoteException)ie).getClassName())) {
           throw ie;
         }
-        LOG.info("problem cleaning system directory: " + systemDir, ie);
+        LOG.info("problem cleaning system directory: " + systemDir + ": " + ie, ie);
+      }
+      try {
+        Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted during system directory cleanup ",
+                e);
       }
-      Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
     }
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
@@ -1553,7 +1591,11 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
+    //this operation is synchronized to stop findbugs warning of inconsistent
+    //access
+    synchronized(this) {
+      completedJobStatusStore = new CompletedJobStatusStore(conf, fs);
+    }
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -1618,9 +1660,16 @@
   }
 
   /**
-   * Run forever
+   * Run forever.
+   * Change the system state to indicate that we are live
+   * @throws InterruptedException interrupted operations
+   * @throws IOException IO Problems
    */
   public void offerService() throws InterruptedException, IOException {
+    if(!enterLiveState()) {
+      //catch re-entrancy by returning early
+      return;
+    };
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler
@@ -1637,79 +1686,139 @@
     this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
 
-    if (completedJobStatusStore.isActive()) {
-      completedJobsStoreThread = new Thread(completedJobStatusStore,
-                                            "completedjobsStore-housekeeper");
-      completedJobsStoreThread.start();
+    synchronized (this) {
+      //this is synchronized to stop findbugs warning
+      if (completedJobStatusStore.isActive()) {
+        completedJobsStoreThread = new Thread(completedJobStatusStore,
+                                              "completedjobsStore-housekeeper");
+        completedJobsStoreThread.start();
+      }
     }
 
+    LOG.info("Starting interTrackerServer");
     // start the inter-tracker server once the jt is ready
     this.interTrackerServer.start();
     
-    synchronized (this) {
-      state = State.RUNNING;
-    }
-    LOG.info("Starting RUNNING");
     
+    LOG.info("Starting RUNNING");
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
 
-  void close() throws IOException {
-    if (this.infoServer != null) {
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (infoServer == null || !infoServer.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + infoPort));
+    }
+    if (interTrackerServer == null) {
+      status.addThrowable(
+              new IOException("InterTrackerServer is not running"));
+    }
+  }
+
+  /**
+   * This service shuts down by stopping the
+   * {@link JobEndNotifier} and then closing down the job
+   * tracker
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    JobEndNotifier.stopNotifier();
+    closeJobTracker();
+  }
+
+  /**
+   * Close down all the Job tracker threads, and the
+   * task scheduler.
+   * This was package scoped, but has been made private so that
+   * it does not get used. Callers should call {@link #close()} to
+   * stop a JobTracker
+   * @throws IOException if problems occur
+   */
+  private void closeJobTracker() throws IOException {
+    if (infoServer != null) {
       LOG.info("Stopping infoServer");
       try {
-        this.infoServer.stop();
+        infoServer.stop();
       } catch (Exception ex) {
         LOG.warn("Exception shutting down JobTracker", ex);
       }
     }
-    if (this.interTrackerServer != null) {
+    if (interTrackerServer != null) {
       LOG.info("Stopping interTrackerServer");
-      this.interTrackerServer.stop();
-    }
-    if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
-      LOG.info("Stopping expireTrackers");
-      this.expireTrackersThread.interrupt();
-      try {
-        this.expireTrackersThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-    if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
-      LOG.info("Stopping retirer");
-      this.retireJobsThread.interrupt();
-      try {
-        this.retireJobsThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
+      interTrackerServer.stop();
     }
+    retireThread("expireTrackersThread", expireTrackersThread);
+    retireThread("retirer", retireJobsThread);
     if (taskScheduler != null) {
       taskScheduler.terminate();
     }
-    if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
-      LOG.info("Stopping expireLaunchingTasks");
-      this.expireLaunchingTaskThread.interrupt();
+    retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
+    retireThread("completedJobsStore thread", completedJobsStoreThread);
+    LOG.info("stopped all jobtracker services");
+  }
+
+  /**
+   * Close the filesystem without raising an exception. At the end of this
+   * method, fs==null.
+   * Warning: closing the FS may make it unusable for other clients in the same JVM.
+   */
+  protected synchronized void closeTheFilesystemQuietly() {
+    if (fs != null) {
       try {
-        this.expireLaunchingTaskThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        fs.close();
+      } catch (IOException e) {
+        LOG.warn("When closing the filesystem: " + e, e);
       }
+      fs = null;
     }
-    if (this.completedJobsStoreThread != null &&
-        this.completedJobsStoreThread.isAlive()) {
-      LOG.info("Stopping completedJobsStore thread");
-      this.completedJobsStoreThread.interrupt();
+  }
+
+  /**
+   * Retire a named thread if it is not null and still alive. The thread will be
+   * interruped and then joined.
+   *
+   * @param name   thread name for log messages
+   * @param thread thread -can be null.
+   * @return true if the thread was shut down; false implies this thread was
+   *         interrupted.
+   */
+  protected boolean retireThread(String name, Thread thread) {
+    if (thread != null && thread.isAlive()) {
+      LOG.info("Stopping " + name);
+      thread.interrupt();
       try {
-        this.completedJobsStoreThread.join();
+        thread.join();
       } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        LOG.info("interruped during " + name + " shutdown", ex);
+        return false;
       }
     }
-    LOG.info("stopped all jobtracker services");
-    return;
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "JobTracker";
   }
     
   ///////////////////////////////////////////////////////
@@ -1847,7 +1956,7 @@
   }
     
   /**
-   * Call {@link #removeTaskEntry(String)} for each of the
+   * Call {@link #removeTaskEntry(TaskAttemptID)} for each of the
    * job's tasks.
    * When the JobTracker is retiring the long-completed
    * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
@@ -2230,7 +2339,7 @@
     return numTaskCacheLevels;
   }
   public int getNumResolvedTaskTrackers() {
-    return numResolved;
+    return taskTrackers.size();
   }
   
   public int getNumberOfUniqueHosts() {
@@ -2757,6 +2866,7 @@
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
 
@@ -2769,6 +2879,7 @@
    * the JobTracker alone.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
@@ -2851,6 +2962,10 @@
 
   public synchronized ClusterStatus getClusterStatus(boolean detailed) {
     synchronized (taskTrackers) {
+      //backport the service state into the job tracker state
+      State state = getServiceState() == ServiceState.LIVE ?
+              State.RUNNING :
+              State.INITIALIZING;
       if (detailed) {
         List<List<String>> trackerNames = taskTrackerNames();
         return new ClusterStatus(trackerNames.get(0),
@@ -3167,6 +3282,10 @@
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
+    if (fs == null) {
+      throw new java.lang.IllegalStateException("Filesystem is null; "
+              + "JobTracker is not live: " + this);
+    }
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }



Mime
View raw message