hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r752984 [1/2] - in /hadoop/core/trunk: conf/ ivy/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/http/ src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/net/ ...
Date Thu, 12 Mar 2009 19:43:06 GMT
Author: cdouglas
Date: Thu Mar 12 19:43:05 2009
New Revision: 752984

URL: http://svn.apache.org/viewvc?rev=752984&view=rev
Log:
Revert HADOOP-3628

Removed:
    hadoop/core/trunk/src/core/org/apache/hadoop/util/MockService.java
    hadoop/core/trunk/src/core/org/apache/hadoop/util/Service.java
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfigurationSubclass.java
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestGetServerAddress.java
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/empty-configuration.xml
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestMiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskTrackerLifecycle.java
    hadoop/core/trunk/src/test/org/apache/hadoop/net/TestDNS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestServiceLifecycle.java
Modified:
    hadoop/core/trunk/conf/log4j.properties
    hadoop/core/trunk/ivy/libraries.properties
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
    hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
    hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
    hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java
    hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/test/hadoop-site.xml
    hadoop/core/trunk/src/test/log4j.properties
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ClusterMapReduceTestCase.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/HadoopTestCase.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMRServerPorts.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: hadoop/core/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/log4j.properties?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/conf/log4j.properties (original)
+++ hadoop/core/trunk/conf/log4j.properties Thu Mar 12 19:43:05 2009
@@ -36,10 +36,8 @@
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
-log4j.appender.console.immediateFlush=true
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t] %p %c{2} %x: %m%n
-#log4j.appender.console.layout.ConversionPattern=%-4r %-5p %c %x - %m%n
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
 
 #
 # TaskLog Appender

Modified: hadoop/core/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/ivy/libraries.properties?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/ivy/libraries.properties (original)
+++ hadoop/core/trunk/ivy/libraries.properties Thu Mar 12 19:43:05 2009
@@ -35,7 +35,7 @@
 hsqldb.version=1.8.0.10
 
 #ivy.version=2.0.0-beta2
-ivy.version=2.0.0
+ivy.version=2.0.0-rc2
 
 jasper.version=5.5.12
 #not able to figureout the version of jsp & jsp-api version to get it resolved throught ivy

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestUlimit.java Thu Mar 12 19:43:05 2009
@@ -71,7 +71,7 @@
    * it to succeed. Then program is launched with insufficient memory and 
    * is expected to be a failure.  
    */
-  public void testCommandLine() throws Exception {
+  public void testCommandLine() {
     if (StreamUtil.isCygwin()) {
       return;
     }
@@ -88,9 +88,11 @@
       fs.delete(outputPath, true);
       assertFalse("output not cleaned up", fs.exists(outputPath));
       mr.waitUntilIdle();
+    } catch(IOException e) {
+      fail(e.toString());
     } finally {
-      MiniDFSCluster.close(dfs);
-      MiniMRCluster.close(mr);
+      mr.shutdown();
+      dfs.shutdown();
     }
   }
 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java Thu Mar 12 19:43:05 2009
@@ -185,9 +185,6 @@
   private Properties properties;
   private Properties overlay;
   private ClassLoader classLoader;
-  private static final String ERROR_PARSING_CONF_FILE = 
-          "Error parsing configuration resource ";
-
   {
     classLoader = Thread.currentThread().getContextClassLoader();
     if (classLoader == null) {
@@ -976,7 +973,7 @@
     }
   }
 
-  protected synchronized Properties getProps() {
+  private synchronized Properties getProps() {
     if (properties == null) {
       properties = new Properties();
       loadResources(properties, resources, quietmode);
@@ -1160,16 +1157,16 @@
       }
         
     } catch (IOException e) {
-      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+      LOG.fatal("error parsing conf file: " + e);
       throw new RuntimeException(e);
     } catch (DOMException e) {
-      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+      LOG.fatal("error parsing conf file: " + e);
       throw new RuntimeException(e);
     } catch (SAXException e) {
-      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+      LOG.fatal("error parsing conf file: " + e);
       throw new RuntimeException(e);
     } catch (ParserConfigurationException e) {
-      LOG.fatal(ERROR_PARSING_CONF_FILE + name + " : " + e, e);
+      LOG.fatal("error parsing conf file: " + e);
       throw new RuntimeException(e);
     }
   }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/http/HttpServer.java Thu Mar 12 19:43:05 2009
@@ -22,7 +22,6 @@
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.net.BindException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -458,11 +457,7 @@
           // then try the next port number.
           if (ex instanceof BindException) {
             if (!findPort) {
-              BindException be = new BindException(
-                      "Port in use: " + listener.getHost()
-                              + ":" + listener.getPort());
-              be.initCause(ex);
-              throw be;
+              throw (BindException) ex;
             }
           } else {
             LOG.info("HttpServer.start() threw a non Bind IOException"); 
@@ -494,14 +489,6 @@
   }
 
   /**
-   * Test for the availability of the web server
-   * @return true if the web server is started, false otherwise
-   */
-  public boolean isAlive() {
-    return webServer.isStarted();
-  }
-
-  /**
    * A very simple servlet to serve up a text representation of the current
    * stack traces. It both returns the stacks to the caller and logs them.
    * Currently the stack traces are done sequentially rather than exactly the

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Thu Mar 12 19:43:05 2009
@@ -284,9 +284,8 @@
     /** Connect to the server and set up the I/O streams. It then sends
      * a header to the server and starts
      * the connection thread that waits for responses.
-     * @throws IOException if the connection attempt was unsuccessful
      */
-    private synchronized void setupIOstreams() throws IOException {
+    private synchronized void setupIOstreams() {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       }
@@ -309,7 +308,7 @@
             /* The max number of retries is 45,
              * which amounts to 20s*45 = 15 minutes retries.
              */
-            handleConnectionFailure(timeoutFailures++, maxRetries, toe);
+            handleConnectionFailure(timeoutFailures++, 45, toe);
           } catch (IOException ie) {
             handleConnectionFailure(ioFailures++, maxRetries, ie);
           }
@@ -328,7 +327,6 @@
       } catch (IOException e) {
         markClosed(e);
         close();
-        throw e;
       }
     }
 
@@ -360,7 +358,7 @@
 
       // throw the exception if the maximum number of retries is reached
       if (curRetries >= maxRetries) {
-        throw wrapException(remoteId.getAddress(), ioe);
+        throw ioe;
       }
 
       // otherwise back off and retry
@@ -369,7 +367,7 @@
       } catch (InterruptedException ignored) {}
       
       LOG.info("Retrying connect to server: " + server + 
-          ". Already tried " + curRetries + " time(s) out of "+ maxRetries);
+          ". Already tried " + curRetries + " time(s).");
     }
 
     /* Write the header for each connection

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java Thu Mar 12 19:43:05 2009
@@ -301,7 +301,7 @@
    * @return the proxy
    * @throws IOException if the far end through a RemoteException
    */
-  public static VersionedProtocol waitForProxy(Class protocol,
+  static VersionedProtocol waitForProxy(Class protocol,
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                Configuration conf,

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/DNS.java Thu Mar 12 19:43:05 2009
@@ -18,15 +18,10 @@
 
 package org.apache.hadoop.net;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
-import java.net.Inet4Address;
-import java.net.Inet6Address;
 import java.util.Enumeration;
 import java.util.Vector;
 
@@ -43,29 +38,6 @@
  * 
  */
 public class DNS {
-  private static final String LOCALHOST = "localhost";
-  private static final Log LOG = LogFactory.getLog(DNS.class);
-
-  /**
-   * Returns the hostname associated with the specified IP address by the
-   * provided nameserver.
-   *
-   * @param address The address to reverse lookup
-   * @param nameserver The host name of a reachable DNS server; can be null
-   * @return The host name associated with the provided IP
-   * @throws NamingException If a NamingException is encountered
-   * @throws IllegalArgumentException if the protocol is unsupported
-   */
-  public static String reverseDns(InetAddress address, String nameserver)
-          throws NamingException {
-    if (address instanceof Inet4Address) {
-      return reverseDns((Inet4Address) address, nameserver);
-    } else if (address instanceof Inet6Address) {
-      return reverseDns((Inet6Address) address, nameserver);
-    } else {
-      throw new IllegalArgumentException("Unsupported Protocol " + address);
-    }
-  }
 
   /**
    * Returns the hostname associated with the specified IP address by the
@@ -73,76 +45,31 @@
    * 
    * @param hostIp
    *            The address to reverse lookup
-   * @param nameserver
-   *            The host name of a reachable DNS server; can be null
+   * @param ns
+   *            The host name of a reachable DNS server
    * @return The host name associated with the provided IP
    * @throws NamingException
    *             If a NamingException is encountered
    */
-  private static String reverseDns(Inet4Address hostIp, String nameserver)
+  public static String reverseDns(InetAddress hostIp, String ns)
     throws NamingException {
     //
     // Builds the reverse IP lookup form
     // This is formed by reversing the IP numbers and appending in-addr.arpa
     //
-    byte[] address32 = hostIp.getAddress();
-    String reverseIP;
-    reverseIP =    ""+(address32[3]&0xff) + '.'
-                    + (address32[2] & 0xff) + '.'
-                    + (address32[1] & 0xff) + '.'
-                    + (address32[0] & 0xff) + '.'
-                    + "in-addr.arpa";
-/*
-    String hostAddress = hostIp.getHostAddress();
-    String[] parts = hostAddress.split("\\.");
-    if (parts.length < 4) {
-      throw new IllegalArgumentException("Unable to determine IPv4 address of "
-              + hostAddress);
-    }
-    reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
+    String[] parts = hostIp.getHostAddress().split("\\.");
+    String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
       + parts[0] + ".in-addr.arpa";
-*/
 
-    return retrievePTRRecord(nameserver, reverseIP);
-  }
-
-  /**
-   * Retrieve the PTR record from a DNS entry; if given the right
-   * reverse IP this will resolve both IPv4 and IPv6 addresses
-   * @param nameserver name server to use; can be null
-   * @param reverseIP reverse IP address to look up
-   * @return the PTR record of the host
-   * @throws NamingException if the record can not be found.
-   */
-  private static String retrievePTRRecord(String nameserver, String reverseIP)
-          throws NamingException {
     DirContext ictx = new InitialDirContext();
-    try {
-      // Use "dns:///" if the default nameserver is to be used
-      Attributes attribute = ictx.getAttributes("dns://"
-                         + ((nameserver == null) ? "" : nameserver) +
+    Attributes attribute =
+      ictx.getAttributes("dns://"               // Use "dns:///" if the default
+                         + ((ns == null) ? "" : ns) + 
+                         // nameserver is to be used
                          "/" + reverseIP, new String[] { "PTR" });
-      return attribute.get("PTR").get().toString();
-    } finally {
-      ictx.close();
-    }
-  }
-
-  /**
-   * Returns the hostname associated with the specified IP address by the
-   * provided nameserver.
-   *
-   * @param address
-   *            The address to reverse lookup
-   * @param nameserver
-   *            The host name of a reachable DNS server; can be null
-   * @return The host name associated with the provided IP
-   * @throws NamingException
-   *             If a NamingException is encountered
-   */
-  private static String reverseDns(Inet6Address address, String nameserver)
-          throws NamingException {
-    throw new IllegalArgumentException("Reverse DNS lookup of IPv6 is currently unsupported: " + address);
+    ictx.close();
+    
+    return attribute.get("PTR").get().toString();
   }
 
   /**
@@ -163,23 +90,21 @@
     try {
       NetworkInterface netIF = NetworkInterface.getByName(strInterface);
       if (netIF == null)
-        return new String[] {getLocalHostIPAddress()};
+        return new String[] { InetAddress.getLocalHost()
+                              .getHostAddress() };
       else {
         Vector<String> ips = new Vector<String>();
-        Enumeration<InetAddress> e = netIF.getInetAddresses();
-
-        while (e.hasMoreElements()) {
-          ips.add((e.nextElement()).getHostAddress());
-        }
-        return ips.toArray(new String[ips.size()]);
+        Enumeration e = netIF.getInetAddresses();
+        while (e.hasMoreElements())
+          ips.add(((InetAddress) e.nextElement()).getHostAddress());
+        return ips.toArray(new String[] {});
       }
     } catch (SocketException e) {
-      return new String[] {getLocalHostIPAddress()};
+      return new String[] { InetAddress.getLocalHost().getHostAddress() };
     }
   }
 
-
-    /**
+  /**
    * Returns the first available IP address associated with the provided
    * network interface
    * 
@@ -205,98 +130,26 @@
    *            The DNS host name
    * @return A string vector of all host names associated with the IPs tied to
    *         the specified interface
-   * @throws UnknownHostException if the hostname cannot be determined
+   * @throws UnknownHostException
    */
   public static String[] getHosts(String strInterface, String nameserver)
     throws UnknownHostException {
     String[] ips = getIPs(strInterface);
     Vector<String> hosts = new Vector<String>();
-    for (String ip : ips) {
-      InetAddress ipAddr = null;
+    for (int ctr = 0; ctr < ips.length; ctr++)
       try {
-        ipAddr = InetAddress.getByName(ip);
-        hosts.add(reverseDns(ipAddr, nameserver));
-      } catch (UnknownHostException e) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("Unable to resolve hostname of " + ip + ": " + e, e);
-      } catch (NamingException e) {
-        if(LOG.isDebugEnabled())
-          LOG.debug("Unable to reverse DNS of host" + ip
-                +  (ipAddr != null ? (" and address "+ipAddr): "")
-                + " : " + e, e);
-      } catch (IllegalArgumentException e) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("Unable to resolve IP address of " + ip
-                  + (ipAddr != null ? (" and address " + ipAddr) : "")
-                  + " : " + e, e);
+        hosts.add(reverseDns(InetAddress.getByName(ips[ctr]),
+                             nameserver));
+      } catch (Exception e) {
       }
-    }
 
-    if (hosts.isEmpty()) {
-      return new String[] { getLocalHostname() };
-    } else {
-      return hosts.toArray(new String[hosts.size()]);
-    }
+    if (hosts.size() == 0)
+      return new String[] { InetAddress.getLocalHost().getCanonicalHostName() };
+    else
+      return hosts.toArray(new String[] {});
   }
 
   /**
-   * The cached hostname -initially null.
-   */
-
-  private static volatile String cachedHostname;
-
-  /**
-   * The cached address hostname -initially null.
-   */
-  private static volatile String cachedHostAddress;
-
-    /**
-     * Determine the local hostname; retrieving it from cache if it is known
-     * If we cannot determine our host name, return "localhost"
-     * This code is not synchronized; if more than one thread calls it while
-     * it is determining the address, the last one to exit the loop wins.
-     * However, as both threads are determining and caching the same value, it
-     * should be moot.
-     * @return the local hostname or "localhost"
-     */
-  private static String getLocalHostname() {
-    if(cachedHostname == null) {
-      try {
-        cachedHostname = InetAddress.getLocalHost().getCanonicalHostName();
-      } catch (UnknownHostException e) {
-        LOG.info("Unable to determine local hostname "
-                + "-falling back to \""+LOCALHOST+"\"", e);
-        cachedHostname = LOCALHOST;
-      }
-    }
-    return cachedHostname;
-  }
-
-  /**
-   * Get the IPAddress of the local host as a string. This may be a loop back
-   * value.
-   * This code is not synchronized; if more than one thread calls it while
-   * it is determining the address, the last one to exit the loop wins.
-   * However, as both threads are determining and caching the same value, it
-   * should be moot.
-   * @return the IPAddress of the localhost
-   * @throws UnknownHostException if not even "localhost" resolves.
-   */
-  private static String getLocalHostIPAddress() throws UnknownHostException {
-    if (cachedHostAddress == null) {
-      try {
-        InetAddress localHost = InetAddress.getLocalHost();
-        cachedHostAddress = localHost.getHostAddress();
-      } catch (UnknownHostException e) {
-        LOG.info("Unable to determine local IP Address "
-                + "-falling back to loopback address", e);
-        cachedHostAddress = InetAddress.getByName(LOCALHOST).getHostAddress();
-      }
-    }
-    return cachedHostAddress;
-  }
-
-    /**
    * Returns all the host names associated by the default nameserver with the
    * address bound to the specified network interface
    * 
@@ -305,7 +158,7 @@
    * @return The list of host names associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the default interface
+   *             If one is encountered while querying the deault interface
    * 
    */
   public static String[] getHosts(String strInterface)
@@ -324,17 +177,15 @@
    * @return The default host names associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the default interface
+   *             If one is encountered while querying the deault interface
    */
   public static String getDefaultHost(String strInterface, String nameserver)
     throws UnknownHostException {
-    if ("default".equals(strInterface)) {
-      return getLocalHostname();
-    }
+    if (strInterface.equals("default")) 
+      return InetAddress.getLocalHost().getCanonicalHostName();
 
-    if ("default".equals(nameserver)) {
+    if (nameserver != null && nameserver.equals("default"))
       return getDefaultHost(strInterface);
-    }
 
     String[] hosts = getHosts(strInterface, nameserver);
     return hosts[0];
@@ -345,12 +196,11 @@
    * nameserver with the address bound to the specified network interface
    * 
    * @param strInterface
-   *            The name of the network interface to query (e.g. eth0).
-   *            Must not be null.
+   *            The name of the network interface to query (e.g. eth0)
    * @return The default host name associated with IPs bound to the network
    *         interface
    * @throws UnknownHostException
-   *             If one is encountered while querying the default interface
+   *             If one is encountered while querying the deault interface
    */
   public static String getDefaultHost(String strInterface)
     throws UnknownHostException {

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/net/NetUtils.java Thu Mar 12 19:43:05 2009
@@ -26,7 +26,6 @@
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
-import java.net.ConnectException;
 import java.nio.channels.SocketChannel;
 import java.util.Map.Entry;
 import java.util.regex.Pattern;
@@ -181,12 +180,9 @@
     String oldAddr = conf.get(oldBindAddressName);
     String oldPort = conf.get(oldPortName);
     String newAddrPort = conf.get(newBindAddressName);
-    if (oldAddr == null && oldPort == null && newAddrPort != null) {
+    if (oldAddr == null && oldPort == null) {
       return newAddrPort;
     }
-    if (newAddrPort == null) {
-      throw new IllegalArgumentException("No value for " + newBindAddressName);
-    }
     String[] newAddrPortParts = newAddrPort.split(":",2);
     if (newAddrPortParts.length != 2) {
       throw new IllegalArgumentException("Invalid address/port: " + 
@@ -399,18 +395,14 @@
     if (socket == null || endpoint == null || timeout < 0) {
       throw new IllegalArgumentException("Illegal argument for connect()");
     }
-    try {
-      SocketChannel ch = socket.getChannel();
-
-      if (ch == null) {
-        // let the default implementation handle it.
-        socket.connect(endpoint, timeout);
-      } else {
-        SocketIOWithTimeout.connect(ch, endpoint, timeout);
-      }
-    } catch (ConnectException e) {
-      throw (ConnectException) new ConnectException(
-              e + " connecting to " + endpoint).initCause(e);
+    
+    SocketChannel ch = socket.getChannel();
+    
+    if (ch == null) {
+      // let the default implementation handle it.
+      socket.connect(endpoint, timeout);
+    } else {
+      SocketIOWithTimeout.connect(ch, endpoint, timeout);
     }
   }
   

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Mar 12 19:43:05 2009
@@ -1384,7 +1384,7 @@
     synchronized void openInfo() throws IOException {
       LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
       if (newInfo == null) {
-        throw new FileNotFoundException("Cannot open HDFS file " + src);
+        throw new IOException("Cannot open filename " + src);
       }
 
       if (locatedBlocks != null) {
@@ -3084,9 +3084,7 @@
      * resources associated with this stream.
      */
     private synchronized void closeInternal() throws IOException {
-      if ( !clientRunning ) {
-          return;
-      }
+      checkOpen();
       isClosed();
 
       try {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/common/Storage.java Thu Mar 12 19:43:05 2009
@@ -24,7 +24,6 @@
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
-import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
@@ -550,10 +549,8 @@
 
     /**
      * Unlock storage.
-     * Does nothing if there is no acquired lock
-     * @throws IOException for IO problems
-     * @throws ClosedChannelException if the channel owning the lock is
-     *  already closed.
+     * 
+     * @throws IOException
      */
     public void unlock() throws IOException {
       if (this.lock == null)
@@ -706,22 +703,11 @@
 
   /**
    * Unlock all storage directories.
-   * @throws IOException on a failure to unlock
+   * @throws IOException
    */
   public void unlockAll() throws IOException {
-    IOException ioe = null;
-    for (StorageDirectory storageDir : storageDirs) {
-      try {
-        storageDir.unlock();
-      } catch (IOException e) {
-        LOG.warn("Failed to unlock " + storageDir.getRoot() + " : " + e, e);
-        if (ioe != null) {
-          ioe = e;
-        }
-      }
-    }
-    if (ioe != null) {
-      throw ioe;
+    for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
+      it.next().unlock();
     }
   }
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Mar 12 19:43:05 2009
@@ -43,6 +43,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -90,7 +91,6 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.Service;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -123,7 +123,7 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode extends Service
+public class DataNode extends Configured 
     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
   
@@ -186,7 +186,7 @@
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
-  private AbstractList<File> dataDirs;
+  
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
   
@@ -206,57 +206,20 @@
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
-   * This constructor does not start the node, merely initialize it
-   * @param conf configuration to use
-   * @param dataDirs list of directories that may be used for data
-   * @throws IOException for historical reasons
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
     super(conf);
     datanodeObject = this;
-    this.dataDirs = dataDirs;
-  }
-
-  /////////////////////////////////////////////////////
-  // Lifecycle
-  /////////////////////////////////////////////////////
-
-  /**
-   * Start any work (in separate threads)
-   *
-   * @throws IOException for any startup failure
-   */
-  @Override
-  public void innerStart() throws IOException {
-    startDataNode(getConf(), dataDirs);
-  }
 
-  /**
-   * {@inheritDoc}.
-   *
-   * This implementation checks for the name system being non-null and live
-   *
-   * @throws IOException for any ping failure
-   * @throws LivenessException if the IPC server is not defined @param status the initial status
-   */
-  @Override
-  public void innerPing(ServiceStatus status) throws IOException {
-    if (ipcServer == null) {
-      status.addThrowable(new LivenessException("No IPC Server running"));
-    }
-    if (dnRegistration == null) {
-      status.addThrowable(new LivenessException("Not registered to a namenode"));
+    try {
+      startDataNode(conf, dataDirs);
+    } catch (IOException ie) {
+      shutdown();
+      throw ie;
     }
   }
     
-  /**
-   * Shut down this instance of the datanode. Returns only after shutdown is
-   * complete.
-   */
-  public void shutdown() {
-    closeQuietly();
-  }
   
   /**
    * This method starts the data node with the specified conf.
@@ -281,7 +244,7 @@
                                      conf.get("dfs.datanode.dns.nameserver","default"));
     }
     InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
-    DataNode.nameNodeAddr = nameNodeAddr;
+    
     this.socketTimeout =  conf.getInt("dfs.socket.timeout",
                                       HdfsConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -367,6 +330,7 @@
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
+    DataNode.nameNodeAddr = nameNodeAddr;
 
     //initialize periodic block scanner
     String reason = null;
@@ -393,9 +357,6 @@
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
         tmpInfoPort == 0, conf);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
-    }
     if (conf.getBoolean("dfs.https.enable", false)) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
@@ -403,9 +364,6 @@
       Configuration sslConf = new Configuration(false);
       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
           "ssl-server.xml"));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
-      }
       this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
     }
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
@@ -462,10 +420,6 @@
         } catch (InterruptedException ie) {}
       }
     }
-    if(!shouldRun) {
-      throw new IOException("Datanode shut down during handshake with NameNode "
-               + getNameNodeAddr());
-    }
     String errorMsg = null;
     // verify build version
     if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
@@ -566,14 +520,10 @@
    * @see FSNamesystem#registerDatanode(DatanodeRegistration)
    * @throws IOException
    */
-  protected void register() throws IOException {
+  private void register() throws IOException {
     if (dnRegistration.getStorageID().equals("")) {
       setNewStorageID(dnRegistration);
     }
-    //if we are LIVE, move into the STARTED state, as registration implies that
-    //the node is no longer LIVE
-    enterState(ServiceState.LIVE, ServiceState.STARTED);
-    //spin until the server is up.
     while(shouldRun) {
       try {
         // reset name to machineName. Mainly for web interface.
@@ -602,8 +552,7 @@
           + dnRegistration.getStorageID() 
           + ". Expecting " + storage.getStorageID());
     }
-    //at this point the DataNode is now live.
-    enterLiveState();
+    
     // random short delay - helps scatter the BR from all DNs
     scheduleBlockReport(initialBlockReportDelay);
   }
@@ -614,25 +563,18 @@
    * This method can only be called by the offerService thread.
    * Otherwise, deadlock might occur.
    */
-  @Override
-  protected void innerClose() throws IOException {
-    synchronized (this) {
-      //disable the should run flag first, so that everything out there starts
-      //to shut down
-      shouldRun = false;
-      //shut down the infoserver
-      if (infoServer != null) {
-        try {
-          infoServer.stop();
-        } catch (Exception e) {
-          LOG.debug("Ignoring exception when shutting down the infoserver", e);
-        }
-      }
-      //shut down the IPC server
-      if (ipcServer != null) {
-        ipcServer.stop();
+  public void shutdown() {
+    if (infoServer != null) {
+      try {
+        infoServer.stop();
+      } catch (Exception e) {
+        LOG.warn("Exception shutting down DataNode", e);
       }
     }
+    if (ipcServer != null) {
+      ipcServer.stop();
+    }
+    this.shouldRun = false;
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
@@ -660,10 +602,9 @@
     
     RPC.stopProxy(namenode); // stop the RPC threads
     
-    if(upgradeManager != null) {
+    if(upgradeManager != null)
       upgradeManager.shutdownUpgrade();
-    }
-    if (blockScannerThread != null) {
+    if (blockScannerThread != null) { 
       blockScannerThread.interrupt();
       try {
         blockScannerThread.join(3600000L); // wait for at most 1 hour
@@ -672,10 +613,8 @@
     }
     if (storage != null) {
       try {
-        storage.unlockAll();
+        this.storage.unlockAll();
       } catch (IOException ie) {
-        LOG.warn("Ignoring exception when unlocking storage: "+ie,
-                ie);
       }
     }
     if (dataNodeThread != null) {
@@ -866,13 +805,14 @@
         if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
             DisallowedDatanodeException.class.getName().equals(reClass) ||
             IncorrectVersionException.class.getName().equals(reClass)) {
-          LOG.warn("DataNode is shutting down: " + re, re);
+          LOG.warn("DataNode is shutting down: " + 
+                   StringUtils.stringifyException(re));
           shutdown();
           return;
         }
-        LOG.warn(re, re);
+        LOG.warn(StringUtils.stringifyException(re));
       } catch (IOException e) {
-        LOG.warn(e, e);
+        LOG.warn(StringUtils.stringifyException(e));
       }
     } // while (shouldRun)
   } // offerService
@@ -1245,9 +1185,7 @@
         startDistributedUpgradeIfNeeded();
         offerService();
       } catch (Exception ex) {
-        LOG.error("Exception while in state " + getServiceState()
-                + " and shouldRun=" + shouldRun + ": " + ex,
-                ex);
+        LOG.error("Exception: " + StringUtils.stringifyException(ex));
         if (shouldRun) {
           try {
             Thread.sleep(5000);
@@ -1327,52 +1265,33 @@
    * @param conf Configuration instance to use.
    * @return DataNode instance for given list of data dirs and conf, or null if
    * no directory from this directory list can be created.
-   * @throws IOException if problems occur when starting the data node
+   * @throws IOException
    */
   public static DataNode makeInstance(String[] dataDirs, Configuration conf)
-          throws IOException {
+    throws IOException {
     ArrayList<File> dirs = new ArrayList<File>();
-    StringBuffer invalid = new StringBuffer();
-    for (String dataDir : dataDirs) {
-      File data = new File(dataDir);
+    for (int i = 0; i < dataDirs.length; i++) {
+      File data = new File(dataDirs[i]);
       try {
         DiskChecker.checkDir(data);
         dirs.add(data);
       } catch(DiskErrorException e) {
-        LOG.warn("Invalid directory in dfs.data.dir: " + e, e);
-        invalid.append(dataDir);
-        invalid.append(" ");
+        LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
       }
     }
-    if (dirs.size() > 0) {
-      DataNode dataNode = new DataNode(conf, dirs);
-      Service.deploy(dataNode);
-      return dataNode;
-    } else {
-      LOG.error("All directories in dfs.data.dir are invalid: " + invalid);
-      return null;
-    }
+    if (dirs.size() > 0) 
+      return new DataNode(conf, dirs);
+    LOG.error("All directories in dfs.data.dir are invalid.");
+    return null;
   }
 
-
-  /**
-   * {@inheritDoc}
-   * @return the name of this service
-   */
-  @Override
-  public String getServiceName() {
-    return "DataNode";
-  }
-  
   @Override
   public String toString() {
-    return "DataNode {" +
+    return "DataNode{" +
       "data=" + data +
-      (dnRegistration != null? (
-        ", localName='" + dnRegistration.getName() + "'" +
-        ", storageID='" + dnRegistration.getStorageID() + "'" ) : "") +
+      ", localName='" + dnRegistration.getName() + "'" +
+      ", storageID='" + dnRegistration.getStorageID() + "'" +
       ", xmitsInProgress=" + xmitsInProgress +
-      ", state="+ getServiceState() +
       "}";
   }
   

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Mar 12 19:43:05 2009
@@ -185,9 +185,6 @@
       }
 
       File blockFiles[] = dir.listFiles();
-      if (blockFiles == null) {
-        throw new IllegalStateException("Not a valid directory: " + dir);
-      }
       for (int i = 0; i < blockFiles.length; i++) {
         if (Block.isBlockFilename(blockFiles[i])) {
           long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Mar 12 19:43:05 2009
@@ -395,7 +395,9 @@
    */
   synchronized void processIOError(int index) {
     if (editStreams == null || editStreams.size() <= 1) {
-      processAllStorageInaccessible();
+      FSNamesystem.LOG.fatal(
+      "Fatal Error : All storage directories are inaccessible."); 
+      Runtime.getRuntime().exit(-1);
     }
     assert(index < getNumStorageDirs());
     assert(getNumStorageDirs() == editStreams.size());
@@ -415,43 +417,27 @@
     //
     fsimage.processIOError(parentStorageDir);
   }
-
-  /**
-   * report inaccessible storage directories and trigger a fatal error
-   */
-  private void processAllStorageInaccessible() {
-    processFatalError("Fatal Error: All storage directories are inaccessible.");
-  }
-
-  /**
-   * Handle a fatal error
-   * @param message message to include in any output
-   */
-  protected void processFatalError(String message) {
-    FSNamesystem.LOG.fatal(message);
-    Runtime.getRuntime().exit(-1);
-  }
-
+  
   /**
    * If there is an IO Error on any log operations on storage directory,
    * remove any stream associated with that directory 
    */
   synchronized void processIOError(StorageDirectory sd) {
     // Try to remove stream only if one should exist
-    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
       return;
-    }
     if (editStreams == null || editStreams.size() <= 1) {
-      processAllStorageInaccessible();
+      FSNamesystem.LOG.fatal(
+          "Fatal Error : All storage directories are inaccessible."); 
+      Runtime.getRuntime().exit(-1);
     }
     for (int idx = 0; idx < editStreams.size(); idx++) {
-      File parentStorageDir = ((EditLogFileOutputStream) editStreams
-              .get(idx)).getFile()
-              .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName())) {
+      File parentStorageDir = ((EditLogFileOutputStream)editStreams
+                                       .get(idx)).getFile()
+                                       .getParentFile().getParentFile();
+      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
         editStreams.remove(idx);
-      }
-    }
+ }
   }
   
   /**
@@ -472,8 +458,10 @@
         }
       }
       if (j == numEditStreams) {
-        processFatalError("Fatal Error: Unable to find sync log on which " +
-                                 " IO error occured. ");
+          FSNamesystem.LOG.error("Unable to find sync log on which " +
+                                 " IO error occured. " +
+                                 "Fatal Error.");
+          Runtime.getRuntime().exit(-1);
       }
       processIOError(j);
     }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Mar 12 19:43:05 2009
@@ -449,59 +449,6 @@
   }
 
   /**
-   * Test for a thread ref not being null or pointing to a dead thread
-   * @param thread the thread to check
-   * @return true if the thread is considered dead
-   */
-  private boolean isDead(Thread thread) {
-      return thread == null || !thread.isAlive();
-  }
-
-  /**
-   * Perform a cursory health check of the namesystem, particulary that it has
-   * not been closed and that all threads are running.
-   * @throws IOException for any health check
-   */
-  void ping() throws IOException {
-    if (!fsRunning) {
-      throw new IOException("Namesystem is not running");
-    }
-    boolean bad = false;
-    StringBuilder sb = new StringBuilder();
-    if (isDead(hbthread)) {
-      bad = true;
-      sb.append("[Heartbeat thread is dead]");
-    }
-    if (isDead(replthread)) {
-      bad = true;
-      sb.append("[Replication thread is dead]");
-    }
-    // this thread's liveness is only relevant in safe mode.
-    if (safeMode!=null && isDead(smmthread)) {
-      bad = true;
-      sb.append("[SafeModeMonitor thread is dead while the name system is in safe mode]");
-    }
-    if (isDead(dnthread)) {
-        bad = true;
-        sb.append("[DecommissionedMonitor thread is dead]");
-    }
-    if (isDead(lmthread)) {
-      bad = true;
-      sb.append("[Lease monitor thread is dead]");
-    }
-    if (pendingReplications == null || !pendingReplications.isAlive()) {
-      bad = true;
-      sb.append("[Pending replication thread is dead]");
-    }
-    if (this != getFSNamesystem()) {
-      bad = true;
-      sb.append("[FSNamesystem not a singleton]");
-    }
-    if (bad) {
-      throw new IOException(sb.toString());
-    }
-  }
-  /**
    * Close down this file system manager.
    * Causes heartbeat and lease daemons to stop; waits briefly for
    * them to finish, but a short timeout returns control back to caller.
@@ -523,10 +470,7 @@
           lmthread.interrupt();
           lmthread.join(3000);
         }
-        if(dir != null) {
-         dir.close();
-         dir =  null;
-        }
+        dir.close();
       } catch (InterruptedException ie) {
       } catch (IOException ie) {
         LOG.error("Error closing FSDirectory", ie);
@@ -1308,13 +1252,9 @@
                                                            null,
                                                            blockSize);
     if (targets.length < this.minReplication) {
-        String message = "File " + src + " could only be replicated to " +
-                targets.length + " nodes, instead of "
-                + minReplication
-                + ". ( there are " + heartbeats.size()
-                + " live data nodes in the cluster)";
-
-        throw new IOException(message);
+      throw new IOException("File " + src + " could only be replicated to " +
+                            targets.length + " nodes, instead of " +
+                            minReplication);
     }
 
     // Allocate a new block and record it in the INode. 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Mar 12 19:43:05 2009
@@ -45,7 +45,6 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Service;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.AccessControlException;
@@ -85,7 +84,7 @@
  * NameNode implements the ClientProtocol interface, which allows
  * clients to ask for DFS services.  ClientProtocol is not
  * designed for direct use by authors of DFS client code.  End-users
- * should instead use the {@link FileSystem} class.
+ * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
  *
  * NameNode also implements the DatanodeProtocol interface, used by
  * DataNode programs that actually store DFS data blocks.  These
@@ -96,7 +95,7 @@
  * secondary namenodes or rebalancing processes to get partial namenode's
  * state, for example partial blocksMap etc.
  **********************************************************/
-public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
+public class NameNode implements ClientProtocol, DatanodeProtocol,
                                  NamenodeProtocol, FSConstants,
                                  RefreshAuthorizationPolicyProtocol {
   static{
@@ -134,6 +133,8 @@
   /** HTTP server address */
   private InetSocketAddress httpAddress = null;
   private Thread emptier;
+  /** only used for testing purposes  */
+  private boolean stopRequested = false;
   /** Is service level authorization enabled? */
   private boolean serviceAuthEnabled = false;
   
@@ -161,17 +162,7 @@
   }
 
   public static InetSocketAddress getAddress(Configuration conf) {
-    URI fsURI = FileSystem.getDefaultUri(conf);
-    if (fsURI == null) {
-      throw new IllegalArgumentException(
-              "No default filesystem URI in the configuration");
-    }
-    String auth = fsURI.getAuthority();
-    if (auth == null) {
-      throw new IllegalArgumentException(
-              "No authority for the Filesystem URI " + fsURI);
-    }
-    return getAddress(auth);
+    return getAddress(FileSystem.getDefaultUri(conf).getAuthority());
   }
 
   public static URI getUri(InetSocketAddress namenode) {
@@ -184,7 +175,6 @@
    * Initialize name-node.
    * 
    * @param conf the configuration
-   * @throws IOException for problems during initialization
    */
   private void initialize(Configuration conf) throws IOException {
     InetSocketAddress socAddr = NameNode.getAddress(conf);
@@ -271,7 +261,7 @@
   }
 
   /**
-   * Create a NameNode.
+   * Start NameNode.
    * <p>
    * The name-node can be started with one of the following startup options:
    * <ul> 
@@ -290,49 +280,14 @@
    * <code>zero</code> in the conf.
    * 
    * @param conf  confirguration
-   * @throws IOException for backwards compatibility
+   * @throws IOException
    */
   public NameNode(Configuration conf) throws IOException {
-    super(conf);
-  }
-
-  /////////////////////////////////////////////////////
-  // Service Lifecycle
-  /////////////////////////////////////////////////////
-
-  /**
-   * This method does all the startup.
-   * It is invoked from {@link #start()} when needed.
-   *
-   * @throws IOException for any problem.
-   */
-  @Override
-  protected void innerStart() throws IOException {
-    initialize(getConf());
-    setServiceState(ServiceState.LIVE);
-  }
-
-    /**
-   * {@inheritDoc}.
-   *
-   * This implementation checks for the name system being non-null and live
-   * @throws IOException for any ping failure
-   * @throws LivenessException if the name system is not running @param status
-   */
-  @Override
-  public void innerPing(ServiceStatus status) throws IOException {
-    if (namesystem == null) {
-      status.addThrowable(new LivenessException("No name system"));
-    } else {
-      try {
-        namesystem.ping();
-      } catch (IOException e) {
-        status.addThrowable(e);
-      }
-    }
-    if (httpServer == null || !httpServer.isAlive()) {
-      status.addThrowable(
-              new IOException("NameNode HttpServer is not running"));
+    try {
+      initialize(conf);
+    } catch (IOException e) {
+      this.stop();
+      throw e;
     }
   }
 
@@ -342,81 +297,34 @@
    */
   public void join() {
     try {
-      if (server != null) {
-        server.join();
-      }
+      this.server.join();
     } catch (InterruptedException ie) {
     }
   }
 
   /**
-   * {@inheritDoc}
-   * To shut down, this service stops all NameNode threads and waits for them
-   * to finish. It also stops the metrics.
+   * Stop all NameNode threads and wait for all to finish.
    */
-  @Override
-  public synchronized void innerClose() throws IOException {
-    LOG.info("Closing NameNode");
+  public void stop() {
+    if (stopRequested)
+      return;
+    stopRequested = true;
     try {
-      if (httpServer != null) {
-        httpServer.stop();
-      }
+      if (httpServer != null) httpServer.stop();
     } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e),e);
-    }
-    httpServer = null;
-    if (namesystem != null) {
-      namesystem.close();
-    }
-    if (emptier != null) {
-      emptier.interrupt();
-      emptier = null;
-    }
-    if (server != null) {
-      server.stop();
-      server = null;
+      LOG.error(StringUtils.stringifyException(e));
     }
+    if(namesystem != null) namesystem.close();
+    if(emptier != null) emptier.interrupt();
+    if(server != null) server.stop();
     if (myMetrics != null) {
       myMetrics.shutdown();
     }
     if (namesystem != null) {
       namesystem.shutdown();
-      namesystem = null;
     }
   }
   
-  /**
-   * Retained for backwards compatibility.
-   */
-  public void stop() {
-    closeQuietly();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return the name of this service
-   */
-  @Override
-  public String getServiceName() {
-    return "NameNode";
-  }
-
-  /**
-   * The toString operator returns the super class name/id, and the state. This
-   * gives all services a slightly useful message in a debugger or test report
-   *
-   * @return a string representation of the object.
-   */
-  @Override
-  public String toString() {
-    return getServiceName() + " instance " + super.toString() + " in state "
-            + getServiceState()
-            + (httpAddress != null ? (" at " + httpAddress + " , "): "")
-            + (server != null ? (", IPC " + server.getListenerAddress()) : "");
-  }
-
-
   /////////////////////////////////////////////////////
   // NamenodeProtocol
   /////////////////////////////////////////////////////
@@ -1061,7 +969,6 @@
     }
 
     NameNode namenode = new NameNode(conf);
-    deploy(namenode);
     return namenode;
   }
     

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java Thu Mar 12 19:43:05 2009
@@ -136,15 +136,6 @@
   }
 
   /**
-   * Test for the replicator being alive.
-   * @return true if the thread is running.
-   */
-  boolean isAlive() {
-    Daemon daemon = timerThread;
-    return daemon != null && daemon.isAlive();
-  }
-
-  /**
    * An object that contains information about a block that 
    * is being replicated. It records the timestamp when the 
    * system started replicating the most recent copy of this

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Mar 12 19:43:05 2009
@@ -1243,26 +1243,11 @@
   /** 
    * Utility that submits a job, then polls for progress until the job is
    * complete.
-   *
-   * @param job the job configuration.
-   * @return the job reference of the completed, successful, job
-   * @throws IOException any IO problem, and job failure
-   */
-  public static RunningJob runJob(JobConf job) throws IOException {
-    return runJob(job, -1);
-  }
-
-  /** 
-   * Utility that submits a job, then polls for progress until the job is
-   * complete.
    * 
    * @param job the job configuration.
-   * @param timeout timeout in milliseconds; any value less than or equal to
-   * zero means "do not time out"
-   * @return the job reference of the completed, successful, job
-   * @throws IOException any IO problem, and job failure
+   * @throws IOException
    */
-  public static RunningJob runJob(JobConf job, long timeout) throws IOException {
+  public static RunningJob runJob(JobConf job) throws IOException {
     JobClient jc = new JobClient(job);
     boolean error = true;
     RunningJob running = null;
@@ -1270,7 +1255,6 @@
     final int MAX_RETRIES = 5;
     int retries = MAX_RETRIES;
     TaskStatusFilter filter;
-    long endTime = timeout > 0 ? System.currentTimeMillis() + timeout : 0;
     try {
       filter = getTaskOutputFilter(job);
     } catch(IllegalArgumentException e) {
@@ -1367,10 +1351,6 @@
           LOG.info("Communication problem with server: " +
                    StringUtils.stringifyException(ie));
         }
-        //check for timeout
-        if (endTime > 0 && endTime > System.currentTimeMillis()) {
-          throw new IOException("Job execution timed out");
-        }
       }
       if (!running.isSuccessful()) {
         throw new IOException("Job failed!");

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Thu Mar 12 19:43:05 2009
@@ -89,11 +89,7 @@
 
   public static void stopNotifier() {
     running = false;
-    //copy into a variable to deal with race conditions
-    Thread notifier = thread;
-    if (notifier != null) {
-      notifier.interrupt();
-    }
+    thread.interrupt();
   }
 
   private static JobEndStatusInfo createNotification(JobConf conf,

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=752984&r1=752983&r2=752984&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Mar 12 19:43:05 2009
@@ -77,14 +77,13 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.Service;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * tracking MR jobs in a network environment.
  *
  *******************************************************/
-public class JobTracker extends Service implements MRConstants, InterTrackerProtocol,
+public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
 
   static{
@@ -111,11 +110,7 @@
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
-  /**
-   * Time in milliseconds to sleep while trying to start the job tracker:
-   * {@value}
-   */
-  private static final int STARTUP_SLEEP_INTERVAL = 1000;
+
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
@@ -171,7 +166,7 @@
     while (true) {
       try {
         result = new JobTracker(conf);
-        deploy(result);
+        result.taskScheduler.setTaskTrackerManager(result);
         break;
       } catch (VersionMismatch e) {
         throw e;
@@ -180,24 +175,19 @@
       } catch (UnknownHostException e) {
         throw e;
       } catch (IOException e) {
-        LOG.warn("Error starting tracker: " +
-                e.getMessage(), e);
+        LOG.warn("Error starting tracker: " + 
+                 StringUtils.stringifyException(e));
       }
-      Thread.sleep(STARTUP_SLEEP_INTERVAL);
+      Thread.sleep(1000);
     }
-    if (result != null && result.isRunning()) {
+    if (result != null) {
       JobEndNotifier.startNotifier();
     }
     return result;
   }
 
-  /**
-   * This stops the tracker, the JobEndNotifier and moves the service into the
-   * terminated state.
-   *
-   * @throws IOException for any trouble during closedown
-   */
-  public synchronized void stopTracker() throws IOException {
+  public void stopTracker() throws IOException {
+    JobEndNotifier.stopNotifier();
     close();
   }
     
@@ -1309,7 +1299,7 @@
   // (hostname --> Node (NetworkTopology))
   Map<String, Node> hostnameToNodeMap = 
     Collections.synchronizedMap(new TreeMap<String, Node>());
-
+  
   // Number of resolved entries
   int numResolved;
     
@@ -1361,7 +1351,7 @@
                                    );
 
   // Used to provide an HTML view on Job, Task, and TaskTracker structures
-  HttpServer infoServer;
+  final HttpServer infoServer;
   int infoPort;
 
   Server interTrackerServer;
@@ -1376,13 +1366,9 @@
   private QueueManager queueManager;
 
   /**
-   * Create the JobTracker, based on the configuration
-   * @param conf configuration to use
-   * @throws IOException on problems initializing the tracker
+   * Start the JobTracker process, listen on the indicated port
    */
   JobTracker(JobConf conf) throws IOException, InterruptedException {
-    super(conf);
-    this.conf = conf;
     //
     // Grab some static constants
     //
@@ -1400,6 +1386,10 @@
     AVERAGE_BLACKLIST_THRESHOLD = 
       conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f); 
 
+    // This is a directory of temporary submission files.  We delete it
+    // on startup, and can delete any files that we're done with
+    this.conf = conf;
+    JobConf jobConf = new JobConf(conf);
 
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
@@ -1412,23 +1402,7 @@
       = conf.getClass("mapred.jobtracker.taskScheduler",
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
-    taskScheduler.setTaskTrackerManager(this);
-  }
                                            
-  /**
-   * This contains the startup logic moved out of the constructor.
-   * It must never be called directly. Instead call {@link Service#start()} and
-   * let service decide whether to invoke this method once and once only.
-   *
-   * Although most of the intialization work has been performed, the
-   * JobTracker does not go live until {@link #offerService()} is called.
-   * accordingly, JobTracker does not enter the Live state here.
-   * @throws IOException for any startup problems
-   */
-  protected void innerStart() throws IOException {
-    // This is a directory of temporary submission files.  We delete it
-    // on startup, and can delete any files that we're done with
-    JobConf jobConf = new JobConf(conf);
     // Set ports, start RPC servers, setup security policy etc.
     InetSocketAddress addr = getAddress(conf);
     this.localMachine = addr.getHostName();
@@ -1484,19 +1458,15 @@
     trackerIdentifier = getDateFormat().format(new Date());
 
     Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
-    //this operation is synchronized to stop findbugs warning of inconsistent
-    //access
-    synchronized (this) {
-      try {
-        java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
-          metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
-        this.myInstrumentation = c.newInstance(this, jobConf);
-      } catch(Exception e) {
-        //Reflection can throw lots of exceptions -- handle them all by 
-        //falling back on the default.
-        LOG.error("failed to initialize job tracker metrics", e);
-        this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
-      }
+    try {
+      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+      this.myInstrumentation = c.newInstance(this, jobConf);
+    } catch(Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by 
+      //falling back on the default.
+      LOG.error("failed to initialize job tracker metrics", e);
+      this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
     }
  
     
@@ -1518,9 +1488,6 @@
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
           fs = FileSystem.get(conf);
-          if(fs == null) {
-            throw new IllegalStateException("Unable to bind to the filesystem");
-          }
         }
         // clean up the system dir, which will only work if hdfs is out of 
         // safe mode
@@ -1562,14 +1529,9 @@
                 ((RemoteException)ie).getClassName())) {
           throw ie;
         }
-        LOG.info("problem cleaning system directory: " + systemDir + ": " + ie, ie);
-      }
-      try {
-        Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted during system directory cleanup ",
-                e);
+        LOG.info("problem cleaning system directory: " + systemDir, ie);
       }
+      Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
     }
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
@@ -1591,11 +1553,7 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
-    //this operation is synchronized to stop findbugs warning of inconsistent
-    //access
-    synchronized(this) {
-      completedJobStatusStore = new CompletedJobStatusStore(conf, fs);
-    }
+    completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -1660,16 +1618,9 @@
   }
 
   /**
-   * Run forever.
-   * Change the system state to indicate that we are live
-   * @throws InterruptedException interrupted operations
-   * @throws IOException IO Problems
+   * Run forever
    */
   public void offerService() throws InterruptedException, IOException {
-    if(!enterLiveState()) {
-      //catch re-entrancy by returning early
-      return;
-    };
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler
@@ -1686,139 +1637,79 @@
     this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
 
-    synchronized (this) {
-      //this is synchronized to stop findbugs warning
-      if (completedJobStatusStore.isActive()) {
-        completedJobsStoreThread = new Thread(completedJobStatusStore,
-                                              "completedjobsStore-housekeeper");
-        completedJobsStoreThread.start();
-      }
+    if (completedJobStatusStore.isActive()) {
+      completedJobsStoreThread = new Thread(completedJobStatusStore,
+                                            "completedjobsStore-housekeeper");
+      completedJobsStoreThread.start();
     }
 
-    LOG.info("Starting interTrackerServer");
     // start the inter-tracker server once the jt is ready
     this.interTrackerServer.start();
     
-    
+    synchronized (this) {
+      state = State.RUNNING;
+    }
     LOG.info("Starting RUNNING");
+    
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
 
-  /////////////////////////////////////////////////////
-  // Service Lifecycle
-  /////////////////////////////////////////////////////
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param status a status that can be updated with problems
-   * @throws IOException for any problem
-   */
-  @Override
-  public void innerPing(ServiceStatus status) throws IOException {
-    if (infoServer == null || !infoServer.isAlive()) {
-      status.addThrowable(
-              new IOException("TaskTracker HttpServer is not running on port "
-                      + infoPort));
-    }
-    if (interTrackerServer == null) {
-      status.addThrowable(
-              new IOException("InterTrackerServer is not running"));
-    }
-  }
-
-  /**
-   * This service shuts down by stopping the
-   * {@link JobEndNotifier} and then closing down the job
-   * tracker
-   *
-   * @throws IOException exceptions which will be logged
-   */
-  @Override
-  protected void innerClose() throws IOException {
-    JobEndNotifier.stopNotifier();
-    closeJobTracker();
-  }
-
-  /**
-   * Close down all the Job tracker threads, and the
-   * task scheduler.
-   * This was package scoped, but has been made private so that
-   * it does not get used. Callers should call {@link #close()} to
-   * stop a JobTracker
-   * @throws IOException if problems occur
-   */
-  private void closeJobTracker() throws IOException {
-    if (infoServer != null) {
+  void close() throws IOException {
+    if (this.infoServer != null) {
       LOG.info("Stopping infoServer");
       try {
-        infoServer.stop();
+        this.infoServer.stop();
       } catch (Exception ex) {
         LOG.warn("Exception shutting down JobTracker", ex);
       }
     }
-    if (interTrackerServer != null) {
+    if (this.interTrackerServer != null) {
       LOG.info("Stopping interTrackerServer");
-      interTrackerServer.stop();
+      this.interTrackerServer.stop();
+    }
+    if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
+      LOG.info("Stopping expireTrackers");
+      this.expireTrackersThread.interrupt();
+      try {
+        this.expireTrackersThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
+    if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
+      LOG.info("Stopping retirer");
+      this.retireJobsThread.interrupt();
+      try {
+        this.retireJobsThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
     }
-    retireThread("expireTrackersThread", expireTrackersThread);
-    retireThread("retirer", retireJobsThread);
     if (taskScheduler != null) {
       taskScheduler.terminate();
     }
-    retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
-    retireThread("completedJobsStore thread", completedJobsStoreThread);
-    LOG.info("stopped all jobtracker services");
-  }
-
-  /**
-   * Close the filesystem without raising an exception. At the end of this
-   * method, fs==null.
-   * Warning: closing the FS may make it unusable for other clients in the same JVM.
-   */
-  protected synchronized void closeTheFilesystemQuietly() {
-    if (fs != null) {
+    if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
+      LOG.info("Stopping expireLaunchingTasks");
+      this.expireLaunchingTaskThread.interrupt();
       try {
-        fs.close();
-      } catch (IOException e) {
-        LOG.warn("When closing the filesystem: " + e, e);
+        this.expireLaunchingTaskThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
       }
-      fs = null;
     }
-  }
-
-  /**
-   * Retire a named thread if it is not null and still alive. The thread will be
-   * interruped and then joined.
-   *
-   * @param name   thread name for log messages
-   * @param thread thread -can be null.
-   * @return true if the thread was shut down; false implies this thread was
-   *         interrupted.
-   */
-  protected boolean retireThread(String name, Thread thread) {
-    if (thread != null && thread.isAlive()) {
-      LOG.info("Stopping " + name);
-      thread.interrupt();
+    if (this.completedJobsStoreThread != null &&
+        this.completedJobsStoreThread.isAlive()) {
+      LOG.info("Stopping completedJobsStore thread");
+      this.completedJobsStoreThread.interrupt();
       try {
-        thread.join();
+        this.completedJobsStoreThread.join();
       } catch (InterruptedException ex) {
-        LOG.info("interruped during " + name + " shutdown", ex);
-        return false;
+        ex.printStackTrace();
       }
     }
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return the name of this service
-   */
-  @Override
-  public String getServiceName() {
-    return "JobTracker";
+    LOG.info("stopped all jobtracker services");
+    return;
   }
     
   ///////////////////////////////////////////////////////
@@ -1956,7 +1847,7 @@
   }
     
   /**
-   * Call {@link #removeTaskEntry(TaskAttemptID)} for each of the
+   * Call {@link #removeTaskEntry(String)} for each of the
    * job's tasks.
    * When the JobTracker is retiring the long-completed
    * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
@@ -2339,7 +2230,7 @@
     return numTaskCacheLevels;
   }
   public int getNumResolvedTaskTrackers() {
-    return taskTrackers.size();
+    return numResolved;
   }
   
   public int getNumberOfUniqueHosts() {
@@ -2866,7 +2757,6 @@
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
-    verifyServiceState(ServiceState.LIVE);
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
 
@@ -2879,7 +2769,6 @@
    * the JobTracker alone.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
-    verifyServiceState(ServiceState.LIVE);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
@@ -2962,10 +2851,6 @@
 
   public synchronized ClusterStatus getClusterStatus(boolean detailed) {
     synchronized (taskTrackers) {
-      //backport the service state into the job tracker state
-      State state = getServiceState() == ServiceState.LIVE ?
-              State.RUNNING :
-              State.INITIALIZING;
       if (detailed) {
         List<List<String>> trackerNames = taskTrackerNames();
         return new ClusterStatus(trackerNames.get(0),
@@ -3282,10 +3167,6 @@
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
-    if (fs == null) {
-      throw new java.lang.IllegalStateException("Filesystem is null; "
-              + "JobTracker is not live: " + this);
-    }
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }



Mime
View raw message