hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r806265 [1/2] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ conf/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/data_join/ src/contrib/dynamic-scheduler/ src/contrib/eclipse-plugin/ src/contrib/fairscheduler/ sr...
Date Thu, 20 Aug 2009 17:17:01 GMT
Author: stevel
Date: Thu Aug 20 17:16:59 2009
New Revision: 806265

URL: http://svn.apache.org/viewvc?rev=806265&view=rev
Log:
MAPREDUCE-233 Service Lifecycle for TT and JT

Added:
    hadoop/mapreduce/branches/MAPREDUCE-233/ivybuild.xml
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/filecache/
      - copied from r805826, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/filecache/DistributedCache.java
      - copied unchanged from r805826, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/
      - copied from r805826, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
      - copied unchanged from r805826, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLifecycle.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/
      - copied from r805826, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
      - copied unchanged from r805826, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
Modified:
    hadoop/mapreduce/branches/MAPREDUCE-233/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/.gitignore   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/CHANGES.txt
    hadoop/mapreduce/branches/MAPREDUCE-233/build.xml
    hadoop/mapreduce/branches/MAPREDUCE-233/conf/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/conf/capacity-scheduler.xml.template   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/lib/hadoop-core-0.21.0-dev.jar
    hadoop/mapreduce/branches/MAPREDUCE-233/lib/hadoop-core-test-0.21.0-dev.jar
    hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/vaidya/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/   (props changed)
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
    hadoop/mapreduce/branches/MAPREDUCE-233/src/webapps/job/   (props changed)

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1 +1,2 @@
 /hadoop/core/branches/branch-0.19/mapred:713112
+/hadoop/mapreduce/trunk:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/.gitignore
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,4 @@
+/hadoop/core/branches/HADOOP-4687/mapred/.gitignore:776175-784965
 /hadoop/core/branches/branch-0.19/mapred/.gitignore:713112
 /hadoop/core/trunk/.gitignore:784664-785643
+/hadoop/mapreduce/trunk/.gitignore:804974-805826

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/CHANGES.txt?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/CHANGES.txt Thu Aug 20 17:16:59 2009
@@ -20,6 +20,9 @@
     MAPREDUCE-817. Add a cache for retired jobs with minimal job info and 
     provide a way to access history file url. (sharad)
 
+    MAPREDUCE-711. Moved Distributed Cache from Common to Map/Reduce
+    project. (Vinod Kumar Vavilapalli via yhemanth)
+
   NEW FEATURES
 
     MAPREDUCE-706. Support for FIFO pools in the fair scheduler.
@@ -400,3 +403,9 @@
     MAPREDUCE-877. Added avro as a dependency to contrib ivy settings.
     (Tsz Wo (Nicholas) Sze via yhemanth)
 
+    MAPREDUCE-852. In build.xml, remove the Main-Class, which is incorrectly
+    set in tools, and rename the target "tools-jar" to "tools".  (szetszwo)
+
+    MAPREDUCE-773. Sends progress reports for compressed gzip inputs in maps.
+    Fixes a native direct buffer leak in LineRecordReader classes.
+    (Hong Tang and ddas)

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/build.xml?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/build.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/build.xml Thu Aug 20 17:16:59 2009
@@ -17,7 +17,7 @@
    limitations under the License.
 -->
 
-<project name="Hadoop" default="compile" 
+<project name="hadoop-mapred" default="compile" 
    xmlns:ivy="antlib:org.apache.ivy.ant"> 
 
   <!-- Load all the default properties, and any the user wants    -->
@@ -440,14 +440,10 @@
     </jar>
   </target>
 
-  <target name="tools-jar" depends="jar, compile-tools" 
+  <target name="tools" depends="jar, compile-tools" 
           description="Make the Hadoop tools jar.">
     <jar jarfile="${build.dir}/${tools.final.name}.jar"
          basedir="${build.tools}">
-      <manifest>
-        <attribute name="Main-Class" 
-                   value="org/apache/hadoop/examples/ExampleDriver"/>
-      </manifest>
     </jar>
   </target>
 
@@ -675,7 +671,7 @@
     </available>  	
   </target>
 
- <target name="all-jars" depends="tools-jar,examples,jar-test">
+ <target name="all-jars" depends="tools,examples,jar-test">
    <subant target="jar">
       <property name="version" value="${version}"/>
       <property name="dist.dir" value="${dist.dir}"/>
@@ -924,7 +920,7 @@
   <!-- ================================================================== -->
   <!--                                                                    -->
   <!-- ================================================================== -->
-  <target name="package" depends="compile, jar, javadoc, docs, api-report, examples, tools-jar, jar-test, package-librecordio"
+  <target name="package" depends="compile, jar, javadoc, docs, api-report, examples, tools, jar-test, package-librecordio"
 	  description="Build distribution">
     <mkdir dir="${dist.dir}"/>
     <mkdir dir="${dist.dir}/lib"/>
@@ -1024,7 +1020,7 @@
     </macro_tar>
   </target>
 
-  <target name="bin-package" depends="compile, jar, examples, tools-jar, jar-test, package-librecordio" 
+  <target name="bin-package" depends="compile, jar, examples, tools, jar-test, package-librecordio" 
 		description="assembles artifacts for binary target">
     <mkdir dir="${dist.dir}"/>
     <mkdir dir="${dist.dir}/lib"/>

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/conf/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/conf:713112
 /hadoop/core/trunk/conf:784664-785643
+/hadoop/mapreduce/trunk/conf:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/conf/capacity-scheduler.xml.template
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/conf/capacity-scheduler.xml.template:713112
 /hadoop/core/trunk/conf/capacity-scheduler.xml.template:776175-785643
+/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template:804974-805826

Added: hadoop/mapreduce/branches/MAPREDUCE-233/ivybuild.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/ivybuild.xml?rev=806265&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/ivybuild.xml (added)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/ivybuild.xml Thu Aug 20 17:16:59 2009
@@ -0,0 +1,349 @@
+<?xml version="1.0"?>
+<project name="hadoop-mapred" default="published"
+  xmlns:ivy="antlib:org.apache.ivy.ant">
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+  <description>
+    This is a build file to publish Hadoop as ivy and maven artifacts.
+    It currently works alongside the original build.xml file, and exists
+    purely to hook up hadoop into the SmartFrog test/release process.
+  </description>
+
+  <!--Override point: allow for overridden in properties to be loaded-->
+  <property file="build.properties" />
+
+
+  <target name="ivy-init-properties" >
+    <property name="ivy.dir" location="ivy" />
+    <loadproperties srcfile="${ivy.dir}/libraries.properties" />
+    <property name="ivysettings.xml" location="${ivy.dir}/ivysettings.xml" />
+    <property name="ivy.jar" location="${ivy.dir}/ivy-${ivy.version}.jar"/>
+    <property name="ivy.org" value="org.apache.hadoop"/>
+
+    <property name="build.dir" location="build" />
+    <property name="build.ivy.dir" location="${build.dir}/ivy" />
+    <property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" />
+    <property name="build.ivy.report.dir" location="${build.ivy.dir}/report" />
+    <property name="build.ivy.maven.dir" location="${build.ivy.dir}/maven" />
+    <property name="build.ivy.maven.pom" 
+      location="${build.ivy.maven.dir}/hadoop-mapred-${hadoop.version}.pom" />
+    <property name="build.ivy.maven.jar" 
+      location="${build.ivy.maven.dir}/hadoop-mapred-${hadoop.version}.jar" />
+
+    <!--this is the naming policy for artifacts we want pulled down-->
+    <property name="ivy.artifact.retrieve.pattern"
+      value="[conf]/[artifact]-[revision].[ext]"/>
+    <!--this is how artifacts that get built are named-->
+    <property name="ivy.publish.pattern"
+      value="hadoop-[revision]-core.[ext]"/>
+    <property name="hadoop.jar"
+      location="${build.dir}/hadoop-mapred-${hadoop.version}.jar" />
+
+    <!--preset to build down; puts us in control of version naming-->
+    <presetdef name="delegate">
+      <ant antfile="build.xml" inheritall="false" inheritrefs="false" >
+        <property name="version" value="${hadoop.version}"/>
+      </ant>
+    </presetdef>
+    <!--preset to build down; puts us in control of version naming-->
+    <presetdef name="delegate2">
+      <subant antfile="build.xml" buildpath="." inheritall="false" inheritrefs="false" >
+        <property name="version" value="${hadoop.version}"/>
+      </subant>
+    </presetdef>
+
+    <!--preset to copy with ant property expansion (and always overwrite)-->
+    <presetdef name="expandingcopy" >
+    <copy overwrite="true">
+      <filterchain>
+        <expandproperties/>
+      </filterchain>
+    </copy>
+  </presetdef>
+  </target>
+
+
+  <target name="ivy-init-dirs" depends="ivy-init-properties" >
+    <mkdir dir="${build.ivy.dir}" />
+    <mkdir dir="${build.ivy.lib.dir}" />
+    <mkdir dir="${build.ivy.report.dir}" />
+    <mkdir dir="${build.ivy.maven.dir}" />
+  </target>
+
+
+  <target name="clean"  depends="ivy-init-properties"
+    description="Clean the output directories" >
+    <delegate target="clean" />
+  </target>
+
+
+  <target name="jar"  depends="ivy-init-dirs"
+    description="build the JAR">
+    <delegate target="jar" />
+  </target>
+
+  <!--
+    This looks for Ivy on the classpath, and is used to skip reloading it if found.
+    It looks for an ivy-2.0 file.
+  -->
+  <target name="ivy-probe-antlib" >
+    <condition property="ivy.found">
+      <typefound uri="antlib:org.apache.ivy.ant" name="cleancache"/>
+    </condition>
+  </target>
+
+
+  <!--
+  To avoid Ivy leaking things across big projects, always load Ivy in the same classloader.
+  Also note how we skip loading Ivy if it is already there, just to make sure all is well.
+  -->
+  <target name="ivy-init-antlib" depends="ivy-init-properties,ivy-init-dirs,ivy-probe-antlib" unless="ivy.found">
+
+    <typedef uri="antlib:org.apache.ivy.ant" onerror="fail"
+      loaderRef="ivyLoader">
+      <classpath>
+        <pathelement location="${ivy.jar}"/>
+      </classpath>
+    </typedef>
+    <fail >
+      <condition >
+        <not>
+          <typefound uri="antlib:org.apache.ivy.ant" name="cleancache"/>
+        </not>
+      </condition>
+      You need Apache Ivy 2.0 or later from http://ant.apache.org/
+      It could not be loaded from ${ivy.jar}
+    </fail>
+  </target>
+
+
+  <target name="ivy-init" depends="ivy-init-antlib" >
+
+    <!--Configure Ivy by reading in the settings file
+        If anyone has already read in a settings file into this settings ID, it gets priority
+    -->
+    <ivy:configure settingsId="hadoop.ivy.settings" file="${ivysettings.xml}" override="false"/>
+
+  </target>
+
+  <target name="ivy-resolve" depends="ivy-init">
+    <ivy:resolve settingsRef="hadoop.ivy.settings"/>
+  </target>
+
+  <target name="ivy-retrieve" depends="ivy-resolve"
+    description="Retrieve all Ivy-managed artifacts for the different configurations">
+    <ivy:retrieve settingsRef="hadoop.ivy.settings"
+      pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" sync="true" />
+  </target>
+
+  <target name="ivy-report" depends="ivy-resolve"
+    description="Generate">
+    <ivy:report todir="${build.ivy.report.dir}" settingsRef="hadoop.ivy.settings"/>
+    <echo>
+      Reports generated:
+${build.ivy.report.dir}
+    </echo>
+  </target>
+
+  <target name="assert-hadoop-jar-exists" depends="ivy-init">
+    <fail>
+      <condition >
+        <not>
+          <available file="${hadoop.jar}" />
+        </not>
+      </condition>
+      Not found: ${hadoop.jar}
+      Please run the target "jar" in the main build file
+    </fail>
+
+  </target>
+
+  <target name="ready-to-publish" depends="jar,assert-hadoop-jar-exists,ivy-resolve"/>
+
+  <target name="ivy-publish-local" depends="ready-to-publish">
+    <ivy:publish
+      settingsRef="hadoop.ivy.settings"
+      resolver="local"
+      pubrevision="${hadoop.version}"
+      overwrite="true"
+      artifactspattern="${build.dir}/${ivy.publish.pattern}" />
+  </target>
+
+
+  <!-- this is here for curiosity, to see how well the makepom task works
+  Answer: it depends whether you want transitive dependencies excluded or not
+  -->
+  <target name="makepom" depends="ivy-resolve">
+    <ivy:makepom settingsRef="hadoop.ivy.settings"
+      ivyfile="ivy.xml"
+      pomfile="${build.ivy.maven.dir}/generated.pom">
+      <ivy:mapping conf="default" scope="default"/>
+      <ivy:mapping conf="master"  scope="master"/>
+      <ivy:mapping conf="runtime" scope="runtime"/>
+    </ivy:makepom>
+  </target>
+
+
+  <target name="copy-jar-to-maven" depends="ready-to-publish">
+    <copy file="${hadoop.jar}"
+      tofile="${build.ivy.maven.jar}"/>
+    <checksum file="${build.ivy.maven.jar}" algorithm="md5"/>
+  </target>
+
+  <target name="copypom" depends="ivy-init-dirs">
+    <expandingcopy file="ivy/hadoop-core.pom"
+      tofile="${build.ivy.maven.pom}"/>
+    <checksum file="${build.ivy.maven.pom}" algorithm="md5"/>
+  </target>
+
+  <target name="maven-artifacts" depends="copy-jar-to-maven,copypom" />
+
+  <target name="published" depends="ivy-publish-local,maven-artifacts">
+
+  </target>
+
+  <target name="ready-to-test" depends="ivy-init-dirs">
+    <property name="test.data.dir" location="${build.dir}/test/data" />
+    <property name="test.reports.dir" location="${build.dir}/test/reports" />
+    <mkdir dir="${test.data.dir}" />
+    <mkdir dir="${test.reports.dir}" />
+  </target>
+
+  <target name="testjob.jar"  depends="ready-to-test">
+    <delegate2 target="jar-test"
+        failonerror="true">
+    </delegate2>
+  </target>
+
+
+  <target name="junit"  depends="ready-to-test,testjob.jar"
+      description="run the junit tests and generate an XML report">
+    <delegate2 target="test-core"
+        failonerror="false">
+      <property name="test.junit.output.format" value="xml" />
+      <property name="test.build.dir" value="${test.data.dir}"/>
+    </delegate2>
+  </target>
+
+  <!-- generate a junit report. 
+  tip: you can run this while junit is still going on-->
+  <target name="junitreport"  depends="ready-to-test">
+    <junitreport todir="${test.reports.dir}">
+      <fileset dir="${test.data.dir}">
+        <include name="TEST-*.xml"/>
+      </fileset>
+      <report format="frames" todir="${test.reports.dir}"/>
+    </junitreport>
+    <echo>reports in ${test.reports.dir}/index.html</echo>
+  </target>
+
+  <target name="tested" depends="junit,junitreport" />
+
+  <target name="svn-init">
+    <presetdef name="svn">
+      <exec executable="svn" failonerror="true">
+      </exec>
+    </presetdef>
+    <property name="issue" value="MAPREDUCE-233"/>
+    <property name="hadoop.svn.host" value="svn.apache.org" />
+    <property name="hadoop-svn"
+        value="https://${hadoop.svn.host}/repos/asf/hadoop/mapreduce"/>
+    <property name="trunk"
+        value="${hadoop-svn}/trunk"/>
+    <property name="branch"
+        value="${hadoop-svn}/branches/${issue}"/>
+    <property name="patches.dir" location="../outgoing"/>
+    <mkdir dir="${patches.dir}" />
+    <property name="patch.version" value="1" />
+    <property name="patch.file" location="${patches.dir}/${issue}-${patch.version}.patch" />
+  </target>
+
+  <target name="svn-merge" depends="svn-init"
+    description="merge in the trunk"  >
+    <svn>
+      <arg value="merge"/>
+      <arg value="${trunk}"/>
+      <arg value="--accept"/>
+      <arg value="postpone"/>
+    </svn>
+  </target>
+  
+  <target name="svn-diff" depends="svn-init"
+    description="diff the local code against the branch"  >
+    <svn>
+      <arg value="diff"/>
+    </svn>
+  </target>
+
+  <target name="svn-resolved" depends="svn-init"
+    description="mark the tree as resolved"  >
+    <svn>
+      <arg value="resolve"/>
+    </svn>
+  </target>
+
+  <!--
+  svn diff \
+  https://svn.apache.org/repos/asf/hadoop/core/trunk \
+  https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-3628-2
+  -->
+  <target name="svn-diff-trunk" depends="svn-init"
+      description="diff against trunk"  >
+    <svn>
+      <arg value="diff" />
+      <arg value="${trunk}"/>
+      <arg value="${branch}"/>
+    </svn>
+  </target>
+
+
+  <target name="svn-create-changelist" depends="svn-init"
+      description="Create a changelist of everything we want in the big patch"  >
+    <property name="hadoop/mapred"
+        value="src/java/org/apache/hadoop/mapred" />
+    <property name="test/mapred"
+        value="src/test/mapred/org/apache/hadoop/mapred" />
+    <svn>
+      <arg value="changelist"/>
+      <arg value="${issue}"/>
+      <arg value="${hadoop/mapred}/JobEndNotifier.java" />
+      <arg value="${hadoop/mapred}/JobTracker.java" />
+      <arg value="${hadoop/mapred}/TaskTracker.java" />
+      <arg value="${hadoop/mapred}/TaskTrackerAction.java" />
+      <arg value="${hadoop/mapred}/TaskTrackerStatus.java" />
+      <arg value="${test/mapred}/TestTaskTrackerLifecycle.java" />
+    </svn>
+  </target>
+
+
+  <!--
+  
+  -->
+  <target name="svn-diff-src" depends="svn-init"
+      description="diff against trunk"  >
+    <echo> Writing to ${patch.file}</echo>
+    <svn output="${patch.file}" >
+      <arg value="diff" />
+      <arg value="${trunk}/src" />
+      <arg value="${branch}/src" />
+      <arg value="--changelist" />
+      <arg value="${issue}"/>
+    </svn>
+  </target>
+
+</project>
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/lib/hadoop-core-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/lib/hadoop-core-0.21.0-dev.jar?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/lib/hadoop-core-test-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/lib/hadoop-core-test-0.21.0-dev.jar?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
Binary files - no diff available.

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++:713112
 /hadoop/core/trunk/src/c++:776175-784663
+/hadoop/mapreduce/trunk/src/c++:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib:713112
 /hadoop/core/trunk/src/contrib:784664-785643
+/hadoop/mapreduce/trunk/src/contrib:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
 /hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
 /hadoop/core/trunk/src/contrib/build.xml:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/build.xml:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
 /hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
 /hadoop/core/trunk/src/contrib/data_join:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/data_join:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
 /hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
+/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
 /hadoop/core/trunk/src/contrib/eclipse-plugin:776175-784663
+/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
 /hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/fairscheduler:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
 /hadoop/core/trunk/src/contrib/index:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/index:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
 /hadoop/core/trunk/src/contrib/mrunit:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/mrunit:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
 /hadoop/core/trunk/src/contrib/sqoop:784975-786373
+/hadoop/mapreduce/trunk/src/contrib/sqoop:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
 /hadoop/core/trunk/src/contrib/streaming:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/streaming:804974-805826

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Thu Aug 20 17:16:59 2009
@@ -50,7 +50,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileInputFormat;

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/vaidya/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112
 /hadoop/core/trunk/src/contrib/vaidya:776175-786373
+/hadoop/mapreduce/trunk/src/contrib/vaidya:804974-805826

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/examples:713112
 /hadoop/core/trunk/src/examples:776175-784663
+/hadoop/mapreduce/trunk/src/examples:804974-805826

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Sort.java Thu Aug 20 17:16:59 2009
@@ -24,7 +24,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java Thu Aug 20 17:16:59 2009
@@ -27,7 +27,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/java:713112
 /hadoop/core/trunk/src/mapred:776175-785643
+/hadoop/mapreduce/trunk/src/java:804974-805826

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java Thu Aug 20 17:16:59 2009
@@ -47,7 +47,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Aug 20 17:16:59 2009
@@ -90,6 +90,7 @@
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Service;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
@@ -101,7 +102,8 @@
  * tracking MR jobs in a network environment.
  *
  *******************************************************/
-public class JobTracker implements MRConstants, InterTrackerProtocol,
+public class JobTracker extends Service 
+    implements MRConstants, InterTrackerProtocol,
     JobSubmissionProtocol, TaskTrackerManager,
     RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol {
 
@@ -110,9 +112,9 @@
     Configuration.addDefaultResource("mapred-site.xml");
   }
 
-  private final long tasktrackerExpiryInterval;
-  private final long retireJobInterval;
-  private final long retireJobCheckInterval;
+  private long tasktrackerExpiryInterval;
+  private long retireJobInterval;
+  private long retireJobCheckInterval;
 
   // The interval after which one fault of a tracker will be discarded,
   // if there are no faults during this. 
@@ -138,7 +140,7 @@
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
-  private final TaskScheduler taskScheduler;
+  private TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
@@ -168,13 +170,13 @@
    * The maximum no. of 'completed' (successful/failed/killed)
    * jobs kept in memory per-user. 
    */
-  final int MAX_COMPLETE_USER_JOBS_IN_MEMORY;
+  int MAX_COMPLETE_USER_JOBS_IN_MEMORY;
 
    /**
     * The minimum time (in ms) that a job's information has to remain
     * in the JobTracker's memory before it is retired.
     */
-  final int MIN_TIME_BEFORE_RETIRE;
+  int MIN_TIME_BEFORE_RETIRE;
 
 
   private int nextJobId = 1;
@@ -207,6 +209,7 @@
     while (true) {
       try {
         result = new JobTracker(conf, clock);
+        startService(result);
         result.taskScheduler.setTaskTrackerManager(result);
         break;
       } catch (VersionMismatch e) {
@@ -216,19 +219,24 @@
       } catch (UnknownHostException e) {
         throw e;
       } catch (IOException e) {
-        LOG.warn("Error starting tracker: " + 
-                 StringUtils.stringifyException(e));
+        LOG.warn("Error starting tracker: " +
+                e, e);
       }
       Thread.sleep(1000);
     }
-    if (result != null) {
+    if (result != null && result.isRunning()) {
       JobEndNotifier.startNotifier();
     }
     return result;
   }
 
-  public void stopTracker() throws IOException {
-    JobEndNotifier.stopNotifier();
+  /**
+   * This stops the tracker, the JobEndNotifier and moves the service into the
+   * terminated state.
+   *
+   * @throws IOException for any trouble during closedown
+   */
+  public synchronized void stopTracker() throws IOException {
     close();
   }
     
@@ -1671,7 +1679,7 @@
     }
   }
 
-  private final JobTrackerInstrumentation myInstrumentation;
+  private JobTrackerInstrumentation myInstrumentation;
     
   /////////////////////////////////////////////////////////////////
   // The real JobTracker
@@ -1770,7 +1778,7 @@
   Thread expireTrackersThread = null;
   RetireJobs retireJobs = new RetireJobs();
   Thread retireJobsThread = null;
-  final int retiredJobsCacheSize;
+  int retiredJobsCacheSize;
   ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
                                                 "expireLaunchingTasks");
@@ -1804,7 +1812,7 @@
                                    );
 
   // Used to provide an HTML view on Job, Task, and TaskTracker structures
-  final HttpServer infoServer;
+  HttpServer infoServer;
   int infoPort;
 
   Server interTrackerServer;
@@ -1815,8 +1823,8 @@
   FileSystem fs = null;
   Path systemDir = null;
   private JobConf conf;
-  private final UserGroupInformation mrOwner;
-  private final String supergroup;
+  private UserGroupInformation mrOwner;
+  private String supergroup;
 
   long limitMaxMemForMapTasks;
   long limitMaxMemForReduceTasks;
@@ -1829,11 +1837,17 @@
     this(conf, new Clock());
   }
   /**
-   * Start the JobTracker process, listen on the indicated port
-   */
-  JobTracker(JobConf conf, Clock clock) throws IOException, InterruptedException {
-    // find the owner of the process
+   * Create the JobTracker, based on the configuration. 
+   * This does not start the service
+   * @param conf configuration to use
+   * @param clock clock to use
+   * @throws IOException on problems initializing the tracker
+   */
+  JobTracker(JobConf conf, Clock clock)
+          throws IOException, InterruptedException {
+    super(conf);
     this.clock = clock;
+    // find the owner of the process
     try {
       mrOwner = UnixUserGroupInformation.login(conf);
     } catch (LoginException e) {
@@ -1885,7 +1899,24 @@
       = conf.getClass("mapred.jobtracker.taskScheduler",
           JobQueueTaskScheduler.class, TaskScheduler.class);
     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
+    //LOOK AT THIS TODO
+    taskScheduler.setTaskTrackerManager(this);
+  }
                                            
+  /**
+   * This contains the startup logic moved out of the constructor.
+   * It must never be called directly. Instead call {@link Service#start()} and
+   * let service decide whether to invoke this method once and once only.
+   *
+   * Although most of the intialization work has been performed, the
+   * JobTracker does not go live until {@link #offerService()} is called.
+   * accordingly, JobTracker does not enter the Live state here.
+   * @throws IOException for any startup problems
+   */
+  protected void innerStart() throws IOException {
+    // This is a directory of temporary submission files.  We delete it
+    // on startup, and can delete any files that we're done with
+    JobConf jobConf = new JobConf(conf);
     // Set ports, start RPC servers, setup security policy etc.
     InetSocketAddress addr = getAddress(conf);
     this.localMachine = addr.getHostName();
@@ -1931,6 +1962,9 @@
     trackerIdentifier = getDateFormat().format(new Date());
 
     // Initialize instrumentation
+    //this operation is synchronized to stop findbugs warning of inconsistent
+    //access
+    synchronized (this) {    
     JobTrackerInstrumentation tmp;
     Class<? extends JobTrackerInstrumentation> metricsInst =
       getInstrumentationClass(jobConf);
@@ -1945,6 +1979,7 @@
       tmp = new JobTrackerMetricsInst(this, jobConf);
     }
     myInstrumentation = tmp;
+    }
     
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
@@ -1964,6 +1999,9 @@
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
           fs = FileSystem.get(conf);
+          if(fs == null) {
+            throw new IllegalStateException("Unable to bind to the filesystem");
+          }
         }
         // clean up the system dir, which will only work if hdfs is out of 
         // safe mode
@@ -2005,9 +2043,15 @@
                 ((RemoteException)ie).getClassName())) {
           throw ie;
         }
-        LOG.info("problem cleaning system directory: " + systemDir, ie);
+        LOG.info("problem cleaning system directory: " + systemDir + ": " + ie,
+                ie);
+      }
+      try {
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted during system directory cleanup ",
+                e);
       }
-      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
     }
     
     // Same with 'localDir' except it's always on the local disk.
@@ -2030,7 +2074,11 @@
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
     //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf);
+    // this operation is synchronized to stop findbugs warning of inconsistent
+    // access
+    synchronized (this) {
+      completedJobStatusStore = new CompletedJobStatusStore(conf);
+    }
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -2131,9 +2179,16 @@
   }
 
   /**
-   * Run forever
+   * Run forever.
+   * Change the system state to indicate that we are live
+   * @throws InterruptedException interrupted operations
+   * @throws IOException IO Problems
    */
   public void offerService() throws InterruptedException, IOException {
+    if(!enterLiveState()) {
+      //catch re-entrancy by returning early
+      return;
+    };
     // Prepare for recovery. This is done irrespective of the status of restart
     // flag.
     while (true) {
@@ -2166,25 +2221,70 @@
     this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
 
-    if (completedJobStatusStore.isActive()) {
-      completedJobsStoreThread = new Thread(completedJobStatusStore,
-                                            "completedjobsStore-housekeeper");
-      completedJobsStoreThread.start();
+    synchronized (this) {
+      //this is synchronized to stop findbugs warning
+      if (completedJobStatusStore.isActive()) {
+        completedJobsStoreThread = new Thread(completedJobStatusStore,
+                                              "completedjobsStore-housekeeper");
+        completedJobsStoreThread.start();
+      }
     }
 
+    LOG.info("Starting interTrackerServer");
     // start the inter-tracker server once the jt is ready
     this.interTrackerServer.start();
     
-    synchronized (this) {
-      state = State.RUNNING;
-    }
     LOG.info("Starting RUNNING");
     
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
 
-  void close() throws IOException {
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (infoServer == null || !infoServer.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + infoPort));
+    }
+    if (interTrackerServer == null) {
+      status.addThrowable(
+              new IOException("InterTrackerServer is not running"));
+    }
+  }
+
+  /**
+   * This service shuts down by stopping the
+   * {@link JobEndNotifier} and then closing down the job
+   * tracker
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    JobEndNotifier.stopNotifier();
+    closeJobTracker();
+  }
+
+  /**
+   * Close down all the Job tracker threads, and the
+   * task scheduler.
+   * This was package scoped, but has been made private so that
+   * it does not get used. Callers should call {@link #close()} to
+   * stop a JobTracker
+   * @throws IOException if problems occur
+   */
+  private void closeJobTracker() throws IOException {
     if (this.infoServer != null) {
       LOG.info("Stopping infoServer");
       try {
@@ -2199,55 +2299,70 @@
     }
 
     stopExpireTrackersThread();
-
-    if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
-      LOG.info("Stopping retirer");
-      this.retireJobsThread.interrupt();
-      try {
-        this.retireJobsThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
+    retireThread("retirer", retireJobsThread);
     if (taskScheduler != null) {
       taskScheduler.terminate();
     }
-    if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
-      LOG.info("Stopping expireLaunchingTasks");
-      this.expireLaunchingTaskThread.interrupt();
-      try {
-        this.expireLaunchingTaskThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
-    if (this.completedJobsStoreThread != null &&
-        this.completedJobsStoreThread.isAlive()) {
-      LOG.info("Stopping completedJobsStore thread");
-      this.completedJobsStoreThread.interrupt();
-      try {
-        this.completedJobsStoreThread.join();
-      } catch (InterruptedException ex) {
-        ex.printStackTrace();
-      }
-    }
+    retireThread("expireLaunchingTasks", expireLaunchingTaskThread);
+    retireThread("completedJobsStore thread", completedJobsStoreThread);
     LOG.info("stopped all jobtracker services");
     return;
   }
 
   void stopExpireTrackersThread() {
-    if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
-      LOG.info("Stopping expireTrackers");
-      this.expireTrackersThread.interrupt();
+    retireThread("expireTrackers", expireTrackersThread);
+  }
+
+  /**
+   * Retire a named thread if it is not null and still alive. The thread will be       
+   * interruped and then joined.                                                       
+   *
+   * @param name   thread name for log messages                                        
+   * @param thread thread -can be null.                                                
+   * @return true if the thread was shut down; false implies this thread was           
+   *         interrupted.                                                              
+   */
+  protected boolean retireThread(String name, Thread thread) {
+    if (thread != null && thread.isAlive()) {
+      LOG.info("Stopping " + name);
+      thread.interrupt();
       try {
-        this.expireTrackersThread.join();
+        thread.join();
       } catch (InterruptedException ex) {
-        ex.printStackTrace();
+        LOG.info("interruped during " + name + " shutdown", ex);
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  /**
+   * Close the filesystem without raising an exception. At the end of this             
+   * method, fs==null.                                                                 
+   * Warning: closing the FS may make it unusable for other clients in the same JVM.   
+   */
+  protected synchronized void closeTheFilesystemQuietly() {
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        LOG.warn("When closing the filesystem: " + e, e);
       }
+      fs = null;
     }
   }
 
-    
+
+  /**
+   * {@inheritDoc}                                                                     
+   *
+   * @return the name of this service                                                  
+   */
+  @Override
+  public String getServiceName() {
+    return "JobTracker";
+  }
+
   ///////////////////////////////////////////////////////
   // Maintain lookup tables; called by JobInProgress
   // and TaskInProgress
@@ -3403,6 +3518,7 @@
    * Allocates a new JobId string.
    */
   public synchronized JobID getNewJobId() throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     return new JobID(getTrackerIdentifier(), nextJobId++);
   }
 
@@ -3415,6 +3531,7 @@
    * the JobTracker alone.
    */
   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
+    verifyServiceState(ServiceState.LIVE);
     if(jobs.containsKey(jobId)) {
       //job already running, don't start twice
       return jobs.get(jobId).getStatus();
@@ -3514,6 +3631,10 @@
 
   public synchronized ClusterStatus getClusterStatus(boolean detailed) {
     synchronized (taskTrackers) {
+      //backport the service state into the job tracker state
+      State state = getServiceState() == ServiceState.LIVE ?
+              State.RUNNING :
+              State.INITIALIZING;
       if (detailed) {
         List<List<String>> trackerNames = taskTrackerNames();
         Collection<BlackListInfo> blackListedTrackers = getBlackListedTrackers();
@@ -3940,6 +4061,10 @@
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
+    if (fs == null) {
+      throw new java.lang.IllegalStateException("Filesystem is null; "
+              + "JobTracker is not live: " + this);
+    }
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Thu Aug 20 17:16:59 2009
@@ -90,7 +90,7 @@
     return true;
   }
   
-  public float getProgress() {
+  public float getProgress() throws IOException {
     return lineRecordReader.getProgress();
   }
   

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java Thu Aug 20 17:16:59 2009
@@ -27,8 +27,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -47,7 +49,10 @@
   private long pos;
   private long end;
   private LineReader in;
+  private FSDataInputStream fileIn;
   int maxLineLength;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
 
   /**
    * A class that provides a line reader from an input stream.
@@ -74,14 +79,14 @@
     end = start + split.getLength();
     final Path file = split.getPath();
     compressionCodecs = new CompressionCodecFactory(job);
-    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
-    FSDataInputStream fileIn = fs.open(split.getPath());
-    if (codec != null) {
-      in = new LineReader(codec.createInputStream(fileIn), job);
-      end = Long.MAX_VALUE;
+    fileIn = fs.open(split.getPath());
+    if (isCompressedInput()) {
+      decompressor = CodecPool.getDecompressor(codec);
+      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
@@ -90,8 +95,7 @@
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, (int) Math.min(
-          (long) Integer.MAX_VALUE, end - start));
+      start += in.readLine(new Text(), 0, maxBytesToConsume());
     }
     this.pos = start;
   }
@@ -124,18 +128,34 @@
     return new Text();
   }
   
+  private boolean isCompressedInput() { return (codec != null); }
+  
+  private int maxBytesToConsume() {
+    return (isCompressedInput()) ? Integer.MAX_VALUE
+                           : (int) Math.min(Integer.MAX_VALUE, (end - start));
+  }
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput()) {
+      retVal = fileIn.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
+  
   /** Read a line. */
   public synchronized boolean next(LongWritable key, Text value)
     throws IOException {
 
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
-    while (pos <= end) {
+    while (getFilePosition() <= end) {
       key.set(pos);
 
       int newSize = in.readLine(value, maxLineLength,
-                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
-                                         maxLineLength));
+                                Math.max(maxBytesToConsume(), maxLineLength));
       if (newSize == 0) {
         return false;
       }
@@ -154,11 +174,11 @@
   /**
    * Get the progress within the split
    */
-  public synchronized float getProgress() {
+  public synchronized float getProgress() throws IOException {
     if (start == end) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (pos - start) / (float)(end - start));
+      return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
     }
   }
   
@@ -167,8 +187,14 @@
   }
 
   public synchronized void close() throws IOException {
-    if (in != null) {
-      in.close(); 
+    try {
+      if (in != null) {
+        in.close();
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
     }
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Aug 20 17:16:59 2009
@@ -32,7 +32,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Aug 20 17:16:59 2009
@@ -51,7 +51,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -86,6 +86,8 @@
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.Service;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -97,7 +99,7 @@
  * for Task assignments and reporting results.
  *
  *******************************************************/
-public class TaskTracker 
+public class TaskTracker extends Service
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
   /**
    * @deprecated
@@ -113,7 +115,7 @@
     "mapred.tasktracker.pmem.reserved";
 
   static final long WAIT_FOR_DONE = 3 * 1000;
-  private int httpPort;
+  int httpPort;
 
   static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
@@ -135,7 +137,10 @@
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  volatile boolean running = true;
+  /**
+   * Flag used to synchronize running state across threads.
+   */
+  private volatile boolean running = false;
 
   private LocalDirAllocator localDirAllocator;
   String taskTrackerName;
@@ -168,7 +173,7 @@
   // The filesystem where job files are stored
   FileSystem systemFS = null;
   
-  private final HttpServer server;
+  private HttpServer server;
     
   volatile boolean shuttingDown = false;
     
@@ -330,33 +335,7 @@
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private Thread taskCleanupThread = 
-    new Thread(new Runnable() {
-        public void run() {
-          while (true) {
-            try {
-              TaskTrackerAction action = tasksToCleanup.take();
-              if (action instanceof KillJobAction) {
-                purgeJob((KillJobAction) action);
-              } else if (action instanceof KillTaskAction) {
-                TaskInProgress tip;
-                KillTaskAction killAction = (KillTaskAction) action;
-                synchronized (TaskTracker.this) {
-                  tip = tasks.get(killAction.getTaskID());
-                }
-                LOG.info("Received KillTaskAction for task: " + 
-                         killAction.getTaskID());
-                purgeTask(tip, false);
-              } else {
-                LOG.error("Non-delete action given to cleanup thread: "
-                          + action);
-              }
-            } catch (Throwable except) {
-              LOG.warn(StringUtils.stringifyException(except));
-            }
-          }
-        }
-      }, "taskCleanup");
+  private TaskCleanupThread taskCleanupThread;
 
   TaskController getTaskController() {
     return taskController;
@@ -487,6 +466,17 @@
    * close().
    */
   synchronized void initialize() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initializing Task Tracker: " + this);
+    }
+    //check that the server is not already live.                        
+
+    //allow this operation in only two service states: started and live 
+    verifyServiceState(ServiceState.STARTED, ServiceState.LIVE);
+
+    //flip the running switch for our inner threads                     
+    running = true;
+
     localFs = FileSystem.getLocal(fConf);
     // use configured nameserver & interface to get local hostname
     if (fConf.get("slave.host.name") != null) {
@@ -570,10 +560,17 @@
     DistributedCache.purgeCache(this.fConf);
     cleanupStorage();
 
+    //mark as just started; this is used in heartbeats
+    this.justStarted = true;
+    int connectTimeout = fConf
+            .getInt("mapred.task.tracker.connect.timeout", 60000);
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
                        InterTrackerProtocol.versionID, 
-                       jobTrackAddr, this.fConf);
+                       jobTrackAddr, this.fConf, connectTimeout);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connected to JobTracker at " + jobTrackAddr);
+    }
     this.justInited = true;
     this.running = true;    
     // start the thread that will fetch map task completion events
@@ -625,7 +622,9 @@
    * startup, to remove any leftovers from previous run.
    */
   public void cleanupStorage() throws IOException {
-    this.fConf.deleteLocalFiles();
+    if (fConf != null) {
+      fConf.deleteLocalFiles();
+    }
   }
 
   // Object on wait which MapEventsFetcherThread is going to wait.
@@ -1141,25 +1140,73 @@
     }
   }
     
+  /////////////////////////////////////////////////////
+  // Service Lifecycle
+  /////////////////////////////////////////////////////
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param status a status that can be updated with problems
+   * @throws IOException       for any problem
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (server == null || !server.isAlive()) {
+      status.addThrowable(
+              new IOException("TaskTracker HttpServer is not running on port "
+                      + httpPort));
+    }
+    if (taskReportServer == null) {
+      status.addThrowable(
+              new IOException("TaskTracker Report Server is not running on "
+              + taskReportAddress));
+    }
+  }
+
+  /**
+   * A shutdown request triggers termination
+   * @throws IOException when errors happen during termination
+   */
   public synchronized void shutdown() throws IOException {
-    shuttingDown = true;
     close();
-    if (this.server != null) {
-      try {
-        LOG.info("Shutting down StatusHttpServer");
-        this.server.stop();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException exceptions which will be logged
+   */
+  @Override
+  protected void innerClose() throws IOException {
+    synchronized (this) {
+      shuttingDown = true;
+      closeTaskTracker();
+      if (this.server != null) {
+        try {
+          LOG.info("Shutting down StatusHttpServer");
+          this.server.stop();
       } catch (Exception e) {
         LOG.warn("Exception shutting down TaskTracker", e);
+        }
       }
+      stopCleanupThreads();
     }
   }
+
   /**
    * Close down the TaskTracker and all its components.  We must also shutdown
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
+   * @throws IOException when errors happen during shutdown
    */
-  public synchronized void close() throws IOException {
+  public synchronized void closeTaskTracker() throws IOException {
+    if (!running) {
+      //this operation is a no-op when not already running
+      return;
+    }
+    running = false;
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1177,13 +1224,21 @@
     cleanupStorage();
         
     // Shutdown the fetcher thread
-    this.mapEventsFetcher.interrupt();
-    
+    if (mapEventsFetcher != null) {
+      mapEventsFetcher.interrupt();
+    }    
     //stop the launchers
-    this.mapLauncher.interrupt();
-    this.reduceLauncher.interrupt();
-    
-    jvmManager.stop();
+    if (mapLauncher != null) {
+      mapLauncher.cleanTaskQueue();
+      mapLauncher.interrupt();
+    }
+    if (reduceLauncher != null) {
+      reduceLauncher.cleanTaskQueue();
+      reduceLauncher.interrupt();
+    }
+    if (jvmManager != null) {
+      jvmManager.stop();
+    }
     
     // shutdown RPC connections
     RPC.stopProxy(jobClient);
@@ -1191,7 +1246,9 @@
     // wait for the fetcher thread to exit
     for (boolean done = false; !done; ) {
       try {
-        this.mapEventsFetcher.join();
+        if (mapEventsFetcher != null) {
+          mapEventsFetcher.join();
+        }
         done = true;
       } catch (InterruptedException e) {
       }
@@ -1221,8 +1278,47 @@
 
   /**
    * Start with the local machine name, and the default JobTracker
+   * Create and start a task tracker.                                   
+   * Subclasses must not subclass this constructor, as it may           
+   * call their initialisation/startup methods before the construction
+   * is complete         
+   * It is here for backwards compatibility.                            
+   * @param conf configuration                                          
+   * @throws IOException for problems on startup                        
    */
   public TaskTracker(JobConf conf) throws IOException {
+    this(conf, true);
+  }
+
+  /**
+   * Subclasses should extend this constructor and pass start=false to the    
+   * superclass to avoid race conditions in constructors and threads.         
+   * @param conf configuration                                                
+   * @param start flag to set to true to start the daemon. Subclasses should
+   * avoid this, starting themselves outside the constructor, to avoid odd
+   * thread-related race conditions. 
+   * @throws IOException for problems on startup                              
+   */
+  protected TaskTracker(JobConf conf, boolean start) throws IOException {
+    super(conf);
+    fConf = conf;
+    //for backwards compatibility, the task tracker starts up unless told not 
+    //to. Subclasses should be very cautious about having their superclass    
+    //do that as subclassed methods can be invoked before the class is fully  
+    //configured                                                              
+    if (start) {
+      startService(this);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws IOException for any problem.
+   */
+  @Override
+  protected synchronized void innerStart() throws IOException {
+    JobConf conf = fConf;
     fConf = conf;
     maxMapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum", 2);
     maxReduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum", 2);
@@ -1264,11 +1360,22 @@
   }
   
   private void startCleanupThreads() throws IOException {
+    taskCleanupThread = new TaskCleanupThread();
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
     directoryCleanupThread = new CleanupQueue();
   }
-  
+
+  /**
+   * Tell the cleanup threads that they should end themselves   
+   */
+  private void stopCleanupThreads() {
+    if (taskCleanupThread != null) {
+      taskCleanupThread.terminate();
+      taskCleanupThread = null;
+    }
+  }
+
   /**
    * The connection to the JobTracker, used by the TaskRunner 
    * for locating remote files.
@@ -1316,6 +1423,7 @@
    */
   State offerService() throws Exception {
     long lastHeartbeat = 0;
+    boolean restartingService = true;
 
     while (running && !shuttingDown) {
       try {
@@ -1331,6 +1439,7 @@
         // 1. Verify the buildVersion
         // 2. Get the system directory & filesystem
         if(justInited) {
+          LOG.debug("Checking build version with JobTracker");
           String jobTrackerBV = jobClient.getBuildVersion();
           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
             String msg = "Shutting down. Incompatible buildVersion." +
@@ -1340,7 +1449,7 @@
             try {
               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
             } catch(Exception e ) {
-              LOG.info("Problem reporting to jobtracker: " + e);
+              LOG.info("Problem reporting to jobtracker: " + e, e);
             }
             return State.DENIED;
           }
@@ -1351,6 +1460,9 @@
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("System directory is " + systemDirectory);
+          }
         }
         
         // Send the heartbeat and process the jobtracker's directives
@@ -1403,6 +1515,15 @@
           return State.STALE;
         }
             
+        //At this point the job tracker is present and compatible,
+        //so the service is coming up.
+        //It is time to declare it as such
+        if (restartingService) {
+          //declare the service as live.
+          enterLiveState();
+          restartingService = false;
+        }
+            
         // resetting heartbeat interval from the response.
         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
         justStarted = false;
@@ -2004,7 +2125,7 @@
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
-      LOG.warn(msg);
+      LOG.warn(msg, e);
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);
@@ -2064,6 +2185,8 @@
               if (!shuttingDown) {
                 LOG.info("Lost connection to JobTracker [" +
                          jobTrackAddr + "].  Retrying...", ex);
+                //enter the started state; we are no longer live
+                enterState(ServiceState.UNDEFINED, ServiceState.STARTED);
                 try {
                   Thread.sleep(5000);
                 } catch (InterruptedException ie) {
@@ -2072,7 +2195,7 @@
             }
           }
         } finally {
-          close();
+          closeTaskTracker();
         }
         if (shuttingDown) { return; }
         LOG.warn("Reinitializing local state");
@@ -3024,7 +3147,17 @@
   String getName() {
     return taskTrackerName;
   }
-    
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return taskTrackerName != null ? taskTrackerName : "Task Tracker";
+  }
+
   private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses(
                                           boolean sendCounters) {
     List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
@@ -3141,7 +3274,9 @@
       // enable the server to track time spent waiting on locks
       ReflectionUtils.setContentionTracing
         (conf.getBoolean("tasktracker.contention.tracking", false));
-      new TaskTracker(conf).run();
+      TaskTracker tracker = new TaskTracker(conf, false);
+      Service.startService(tracker);
+      tracker.run();
     } catch (Throwable e) {
       LOG.error("Can not start task tracker because "+
                 StringUtils.stringifyException(e));
@@ -3460,7 +3595,7 @@
       try {
         purgeTask(tip, wasFailure); // Marking it as failed/killed.
       } catch (IOException ioe) {
-        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe);
+        LOG.warn("Couldn't purge the task of " + tid + ". Error : " + ioe, ioe);
       }
     }
   }
@@ -3484,4 +3619,66 @@
     healthChecker = new NodeHealthCheckerService(conf);
     healthChecker.start();
   }
+
+
+  /**
+   * Thread that handles cleanup
+   */
+  private class TaskCleanupThread extends Daemon {
+
+    /**
+     * flag to halt work
+     */
+    private volatile boolean live = true;
+
+
+    /**
+     * Construct a daemon thread.
+     */
+    private TaskCleanupThread() {
+      setName("Task Tracker Task Cleanup Thread");
+    }
+
+    /**
+     * End the daemon. This is done by setting the live flag to false and
+     * interrupting ourselves.
+     */
+    public void terminate() {
+      live = false;
+      interrupt();
+    }
+
+    /**
+     * process task kill actions until told to stop being live.
+     */
+    public void run() {
+      LOG.debug("Task cleanup thread started");
+      while (live) {
+        try {
+          TaskTrackerAction action = tasksToCleanup.take();
+          if (action instanceof KillJobAction) {
+            purgeJob((KillJobAction) action);
+          } else if (action instanceof KillTaskAction) {
+            TaskInProgress tip;
+            KillTaskAction killAction = (KillTaskAction) action;
+            synchronized (TaskTracker.this) {
+              tip = tasks.get(killAction.getTaskID());
+            }
+            LOG.info("Received KillTaskAction for task: " +
+                    killAction.getTaskID());
+            purgeTask(tip, false);
+          } else {
+            LOG.error("Non-delete action given to cleanup thread: "
+                    + action);
+          }
+        } catch (InterruptedException except) {
+          //interrupted. this may have reset the live flag                
+        } catch (Throwable except) {
+          LOG.warn("Exception in Cleanup thread: " + except,
+                  except);
+        }
+      }
+      LOG.debug("Task cleanup thread ending");
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Application.java Thu Aug 20 17:16:59 2009
@@ -29,7 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.NullWritable;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Thu Aug 20 17:16:59 2009
@@ -37,7 +37,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java Thu Aug 20 17:16:59 2009
@@ -118,7 +118,7 @@
     return value;
   }
 
-  public float getProgress() {
+  public float getProgress() throws IOException {
     return lineRecordReader.getProgress();
   }
   

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Thu Aug 20 17:16:59 2009
@@ -26,8 +26,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -48,10 +50,13 @@
   private long pos;
   private long end;
   private LineReader in;
+  private FSDataInputStream fileIn;
   private int maxLineLength;
   private LongWritable key = null;
   private Text value = null;
   private Counter inputByteCounter;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
 
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
@@ -65,14 +70,14 @@
     end = start + split.getLength();
     final Path file = split.getPath();
     compressionCodecs = new CompressionCodecFactory(job);
-    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
-    FSDataInputStream fileIn = fs.open(split.getPath());
-    if (codec != null) {
-      in = new LineReader(codec.createInputStream(fileIn), job);
-      end = Long.MAX_VALUE;
+    fileIn = fs.open(split.getPath());
+    if (isCompressedInput()) {
+      decompressor = CodecPool.getDecompressor(codec);
+      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
@@ -81,12 +86,27 @@
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, (int) Math.min(
-          (long) Integer.MAX_VALUE, end - start));
+      start += in.readLine(new Text(), 0, maxBytesToConsume());
     }
     this.pos = start;
   }
   
+  private boolean isCompressedInput() { return (codec != null); }
+  
+  private int maxBytesToConsume() {
+    return (isCompressedInput()) ? Integer.MAX_VALUE
+                           : (int) Math.min(Integer.MAX_VALUE, (end - start));
+  }
+  
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput()) {
+      retVal = fileIn.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
   public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
@@ -98,10 +118,9 @@
     int newSize = 0;
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
-    while (pos <= end) {
+    while (getFilePosition() <= end) {
       newSize = in.readLine(value, maxLineLength,
-                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
-                                     maxLineLength));
+                            Math.max(maxBytesToConsume(), maxLineLength));
       if (newSize == 0) {
         break;
       }
@@ -137,17 +156,23 @@
   /**
    * Get the progress within the split
    */
-  public float getProgress() {
+  public float getProgress() throws IOException {
     if (start == end) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (pos - start) / (float)(end - start));
+      return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
     }
   }
   
   public synchronized void close() throws IOException {
-    if (in != null) {
-      in.close(); 
+    try {
+      if (in != null) {
+        in.close();
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
     }
   }
 }

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 20 17:16:59 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
 /hadoop/core/trunk/src/test/mapred:776175-785643
+/hadoop/mapreduce/trunk/src/test/mapred:804974-805826

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Thu Aug 20 17:16:59 2009
@@ -47,12 +47,16 @@
     int totalSlots;
     private String[] trackers;
 
+    /**
+     * This job tracker starts itself in its constructor
+     */
     FakeJobTracker(JobConf conf, Clock clock, String[] tts) throws IOException, 
     InterruptedException {
       super(conf, clock);
       this.trackers = tts;
       //initialize max{Map/Reduce} task capacities to twice the clustersize
       totalSlots = trackers.length * 4;
+      startService(this);
     }
     @Override
     public ClusterStatus getClusterStatus(boolean detailed) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java Thu Aug 20 17:16:59 2009
@@ -33,7 +33,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+
 import java.net.URI;
 
 public class MRCaching {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Thu Aug 20 17:16:59 2009
@@ -28,7 +28,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=806265&r1=806264&r2=806265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Thu Aug 20 17:16:59 2009
@@ -405,9 +405,8 @@
     JobID id = job.getID();
     
     // wait for 50%
-    while (job.mapProgress() < 0.5f) {
-      UtilsForTests.waitFor(100);
-    }
+    UtilsForTests.waitForJobHalfDone(job);
+
     
     // change the exclude-hosts file to include one host
     FileOutputStream out = new FileOutputStream(file);



Mime
View raw message