hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r832892 - in /hadoop/mapreduce/branches/HDFS-641: ./ conf/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/data_join/ src/contrib/dynamic-scheduler/ src/contrib/eclipse-plugin/ src/contrib/fairscheduler/ src/contrib/f...
Date Wed, 04 Nov 2009 22:50:24 GMT
Author: omalley
Date: Wed Nov  4 22:50:23 2009
New Revision: 832892

URL: http://svn.apache.org/viewvc?rev=832892&view=rev
Log:
Updated merge from the trunk

Added:
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/api-reference.txt
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/doc/api-reference.txt
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
      - copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
Removed:
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
Modified:
    hadoop/mapreduce/branches/HDFS-641/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/.gitignore   (props changed)
    hadoop/mapreduce/branches/HDFS-641/CHANGES.txt
    hadoop/mapreduce/branches/HDFS-641/conf/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/conf/capacity-scheduler.xml.template   (props changed)
    hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-0.22.0-dev.jar
    hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-test-0.22.0-dev.jar
    hadoop/mapreduce/branches/HDFS-641/src/c++/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/build-contrib.xml   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/build.xml   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/data_join/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/dynamic-scheduler/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/eclipse-plugin/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/index/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/mrunit/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
    hadoop/mapreduce/branches/HDFS-641/src/contrib/streaming/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/contrib/vaidya/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml
    hadoop/mapreduce/branches/HDFS-641/src/examples/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/java/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
    hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
    hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java
    hadoop/mapreduce/branches/HDFS-641/src/test/mapred/   (props changed)
    hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
    hadoop/mapreduce/branches/HDFS-641/src/webapps/job/   (props changed)

Propchange: hadoop/mapreduce/branches/HDFS-641/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19/mapred:713112
-/hadoop/mapreduce/trunk:817878-830225
+/hadoop/mapreduce/trunk:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/.gitignore
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,4 +1,4 @@
 /hadoop/core/branches/HADOOP-4687/mapred/.gitignore:776175-784965
 /hadoop/core/branches/branch-0.19/mapred/.gitignore:713112
 /hadoop/core/trunk/.gitignore:784664-785643
-/hadoop/mapreduce/trunk/.gitignore:817878-830225
+/hadoop/mapreduce/trunk/.gitignore:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/CHANGES.txt?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/HDFS-641/CHANGES.txt Wed Nov  4 22:50:23 2009
@@ -28,6 +28,11 @@
     MAPREDUCE-1090. Modified log statement in TaskMemoryManagerThread to
     include task attempt id. (yhemanth)
 
+    MAPREDUCE-1069. Implement Sqoop API refactoring. (Aaron Kimball via
+    tomwhite)
+
+    MAPREDUCE-1036. Document Sqoop API. (Aaron Kimball via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
@@ -37,6 +42,9 @@
 
   BUG FIXES
 
+    MAPREDUCE-1089. Fix NPE in fair scheduler preemption when tasks are
+    scheduled but not running. (Todd Lipcon via matei)
+
     MAPREDUCE-1014. Fix the libraries for common and hdfs. (omalley)
 
     MAPREDUCE-1111. JT Jetty UI not working if we run mumak.sh 
@@ -48,6 +56,16 @@
     MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while holding a
     global lock. (Amareshwari Sriramadasu via acmurthy)
 
+    MAPREDUCE-1158. Fix JT running maps and running reduces metrics.
+    (sharad)
+
+    MAPREDUCE-1160. Reduce verbosity of log lines in some Map/Reduce classes
+    to avoid filling up jobtracker logs on a busy cluster.
+    (Ravi Gummadi and Hong Tang via yhemanth)
+
+    MAPREDUCE-1153. Fix tasktracker metrics when trackers are decommissioned.
+    (sharad)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
@@ -472,6 +490,9 @@
     MAPREDUCE-1012. Mark Context interfaces as public evolving. (Tom White via
     cdouglas)
 
+    MAPREDUCE-971. Document use of distcp when copying to s3, managing timeouts
+    in particular. (Aaron Kimball via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 
@@ -826,3 +847,6 @@
     queue capacity. (Rahul Kumar Singh via yhemanth)
 
     MAPREDUCE-1016.  Make the job history log format JSON.  (cutting)
+
+    MAPREDUCE-1038. Weave Mumak aspects only if related files have changed.
+    (Aaron Kimball via cdouglas)

Propchange: hadoop/mapreduce/branches/HDFS-641/conf/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/conf:713112
 /hadoop/core/trunk/conf:784664-785643
-/hadoop/mapreduce/trunk/conf:817878-830225
+/hadoop/mapreduce/trunk/conf:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/conf/capacity-scheduler.xml.template
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/conf/capacity-scheduler.xml.template:713112
 /hadoop/core/trunk/conf/capacity-scheduler.xml.template:776175-785643
-/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template:817878-830225
+/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-0.22.0-dev.jar?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-test-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-test-0.22.0-dev.jar?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
Binary files - no diff available.

Propchange: hadoop/mapreduce/branches/HDFS-641/src/c++/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++:713112
 /hadoop/core/trunk/src/c++:776175-784663
-/hadoop/mapreduce/trunk/src/c++:817878-830225
+/hadoop/mapreduce/trunk/src/c++:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib:713112
 /hadoop/core/trunk/src/contrib:784664-785643
-/hadoop/mapreduce/trunk/src/contrib:817878-830225
+/hadoop/mapreduce/trunk/src/contrib:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
 /hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
 /hadoop/core/trunk/src/contrib/build.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build.xml:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/build.xml:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
 /hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
 /hadoop/core/trunk/src/contrib/data_join:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/data_join:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/data_join:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
 /hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
 /hadoop/core/trunk/src/contrib/eclipse-plugin:776175-784663
-/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
 /hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Nov  4 22:50:23 2009
@@ -831,7 +831,11 @@
     List<TaskStatus> statuses = new ArrayList<TaskStatus>();
     for (TaskInProgress tip: tips) {
       for (TaskAttemptID id: tip.getActiveTasks().keySet()) {
-        statuses.add(tip.getTaskStatus(id));
+        TaskStatus stat = tip.getTaskStatus(id);
+        // status is null when the task has been scheduled but not yet running
+        if (stat != null) {
+          statuses.add(stat);
+        }
       }
     }
     return statuses;

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
 /hadoop/core/trunk/src/contrib/index:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/index:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/index:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
 /hadoop/core/trunk/src/contrib/mrunit:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/mrunit:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/mrunit:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml Wed Nov  4 22:50:23 2009
@@ -30,6 +30,7 @@
   <property name="javac.version" value="1.6"/>
   <property name="javac.args.warnings" value="-Xlint:unchecked"/>
   <import file="../build-contrib.xml"/>
+  <property name="mumak.stamp.file" value="${build.dir}/mumak.uptodate.stamp"/>
 
   <target name="compile-java-sources" depends="init, ivy-retrieve-common" unless="skip.contrib">
     <echo message="contrib: ${name}"/>
@@ -68,6 +69,7 @@
       deprecation="${javac.deprecation}">
       <classpath refid="contrib-classpath"/>
     </iajc>
+    <touch file="${mumak.stamp.file}" mkdirs="true" />
     <echo message="Weaving of aspects is finished"/>
   </target>
 
@@ -94,7 +96,7 @@
         <include name="*.jar"/>
       </fileset>
     </copy>
-    <exec executable="sed">                                                                                                          
+    <exec executable="sed">
       <arg value="-i"/>
       <arg value="-e"/>
       <arg value="s/^HADOOP_VERSION=/HADOOP_VERSION=${version}/"/>
@@ -104,14 +106,13 @@
   </target>
 
   <target name="check.aspects">
-    <uptodate property="build.unnecessary" 
-        targetfile="${build.dir}/${dest.jar}" >
-        <srcfiles dir="${hadoop.root}/src/java/"
-          includes="org/apache/hadoop/**/*.java" />
-        <srcfiles dir="${hadoop.root}/build/src/"
-          includes="org/apache/hadoop/**/*.java" />
-        <srcfiles dir="${src.dir}"
-          includes="org/apache/hadoop/**/*.java, org/apache/hadoop/**/*.aj" />
+    <uptodate property="build.unnecessary"
+        targetfile="${mumak.stamp.file}">
+      <srcfiles dir="${hadoop.root}/src/java/"
+        includes="org/apache/hadoop/**/*.java" />
+      <srcfiles dir="${hadoop.root}/src/webapps/"
+        includes="**/*.jsp" />
+      <srcfiles dir="${src.dir}" includes="org/apache/hadoop/**/*.aj" />
     </uptodate>
   </target>
 

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
 /hadoop/core/trunk/src/contrib/sqoop:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/sqoop:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/sqoop:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt Wed Nov  4 22:50:23 2009
@@ -61,3 +61,5 @@
 
 include::supported-dbs.txt[]
 
+include::api-reference.txt[]
+

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Wed Nov  4 22:50:23 2009
@@ -28,6 +28,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
@@ -117,6 +118,8 @@
 
   private boolean areDelimsManuallySet;
 
+  private Configuration conf;
+
   public static final int DEFAULT_NUM_MAPPERS = 4;
 
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
@@ -249,6 +252,8 @@
     this.useCompression = false;
     this.directSplitSize = 0;
 
+    this.conf = new Configuration();
+
     loadFromProperties();
   }
 
@@ -869,4 +874,12 @@
   public long getDirectSplitSize() {
     return this.directSplitSize;
   }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration config) {
+    this.conf = config;
+  }
 }

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Wed Nov  4 22:50:23 2009
@@ -29,6 +29,7 @@
 
 import org.apache.hadoop.sqoop.hive.HiveImport;
 import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
 import org.apache.hadoop.sqoop.orm.ClassWriter;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
 import org.apache.hadoop.sqoop.util.ImportError;
@@ -88,7 +89,8 @@
 
     if (options.getAction() == ImportOptions.ControlAction.FullImport) {
       // Proceed onward to do the import.
-      manager.importTable(tableName, jarFile, getConf());
+      ImportJobContext context = new ImportJobContext(tableName, jarFile, options);
+      manager.importTable(context);
 
       // If the user wants this table to be in Hive, perform that post-load.
       if (options.doHiveImport()) {
@@ -103,6 +105,7 @@
    */
   public int run(String [] args) {
     options = new ImportOptions();
+    options.setConf(getConf());
     try {
       options.parse(args);
       options.validate();

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Wed Nov  4 22:50:23 2009
@@ -35,7 +35,7 @@
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.util.Executor;
-import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 
 /**
  * Utility to import a table into the Hive metastore. Manages the connection
@@ -158,8 +158,9 @@
       args.add("-f");
       args.add(tmpFilename);
 
-      LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
-      int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
+      LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+      int ret = Executor.exec(args.toArray(new String[0]),
+          env.toArray(new String[0]), logSink, logSink);
       if (0 != ret) {
         throw new IOException("Hive exited with status " + ret);
       }

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java Wed Nov  4 22:50:23 2009
@@ -19,8 +19,10 @@
 package org.apache.hadoop.sqoop.io;
 
 import java.io.BufferedWriter;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.IOException;
+import java.util.Formatter;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Wed Nov  4 22:50:23 2009
@@ -33,27 +33,27 @@
  * The implementations of this class drive the actual discussion with
  * the database about table formats, etc.
  */
-public interface ConnManager {
+public abstract class ConnManager {
 
   /**
    * Return a list of all databases on a server
    */
-  String [] listDatabases();
+  public abstract String [] listDatabases();
 
   /**
    * Return a list of all tables in a database
    */
-  String [] listTables();
+  public abstract String [] listTables();
 
   /**
    * Return a list of column names in a table in the order returned by the db.
    */
-  String [] getColumnNames(String tableName);
+  public abstract String [] getColumnNames(String tableName);
 
   /**
    * Return the name of the primary key for a table, or null if there is none.
    */
-  String getPrimaryKey(String tableName);
+  public abstract String getPrimaryKey(String tableName);
 
   /**
    * Return an unordered mapping from colname to sqltype for
@@ -61,7 +61,7 @@
    *
    * The Integer type id is a constant from java.sql.Types
    */
-  Map<String, Integer> getColumnTypes(String tableName);
+  public abstract Map<String, Integer> getColumnTypes(String tableName);
 
   /**
    * Execute a SQL statement to read the named set of columns from a table.
@@ -70,32 +70,32 @@
    * The client is responsible for calling ResultSet.close() when done with the
    * returned ResultSet object.
    */
-  ResultSet readTable(String tableName, String [] columns) throws SQLException;
+  public abstract ResultSet readTable(String tableName, String [] columns) throws SQLException;
 
   /**
    * @return the actual database connection
    */
-  Connection getConnection() throws SQLException;
+  public abstract Connection getConnection() throws SQLException;
 
   /**
    * @return a string identifying the driver class to load for this JDBC connection type.
    */
-  String getDriverClass();
+  public abstract String getDriverClass();
 
   /**
    * Execute a SQL statement 's' and print its results to stdout
    */
-  void execAndPrint(String s);
+  public abstract void execAndPrint(String s);
 
   /**
    * Perform an import of a table from the database into HDFS
    */
-  void importTable(String tableName, String jarFile, Configuration conf)
+  public abstract void importTable(ImportJobContext context)
       throws IOException, ImportError;
 
   /**
    * Perform any shutdown operations on the connection.
    */
-  void close() throws SQLException;
+  public abstract void close() throws SQLException;
 }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java Wed Nov  4 22:50:23 2009
@@ -27,7 +27,7 @@
  * Contains instantiation code for all ConnManager implementations
  * shipped and enabled by default in Sqoop.
  */
-public final class DefaultManagerFactory implements ManagerFactory {
+public final class DefaultManagerFactory extends ManagerFactory {
 
   public static final Log LOG = LogFactory.getLog(DefaultManagerFactory.class.getName());
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Wed Nov  4 22:50:23 2009
@@ -28,8 +28,6 @@
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
@@ -41,11 +39,13 @@
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.Executor;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
 import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.AsyncSink;
 
 /**
  * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@@ -64,35 +64,24 @@
   /** Copies data directly into HDFS, adding the user's chosen line terminator
       char to each record.
     */
-  static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
+  static class PostgresqlAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
     private final ImportOptions options;
 
-    PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
+    PostgresqlAsyncSink(final SplittableBufferedWriter w, final ImportOptions opts,
         final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
       this.counters = ctrs;
     }
 
-    private PostgresqlStreamThread child;
-
     public void processStream(InputStream is) {
       child = new PostgresqlStreamThread(is, writer, options, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    private static class PostgresqlStreamThread extends Thread {
+    private static class PostgresqlStreamThread extends ErrorableThread {
       public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
 
       private final SplittableBufferedWriter writer;
@@ -100,8 +89,6 @@
       private final ImportOptions options;
       private final PerfCounters counters;
 
-      private boolean error;
-
       PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
           final ImportOptions opts, final PerfCounters ctrs) {
         this.stream = is;
@@ -110,10 +97,6 @@
         this.counters = ctrs;
       }
 
-      public boolean isErrored() {
-        return error;
-      }
-
       public void run() {
         BufferedReader r = null;
         SplittableBufferedWriter w = this.writer;
@@ -138,7 +121,7 @@
         } catch (IOException ioe) {
           LOG.error("IOException reading from psql: " + ioe.toString());
           // set the error bit so our caller can see that something went wrong.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -312,9 +295,13 @@
    * Import the table into HDFS by using psql to pull the data out of the db
    * via COPY FILE TO STDOUT.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
     throws IOException, ImportError {
 
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
+
     LOG.info("Beginning psql fast path import");
 
     if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -327,7 +314,7 @@
     String commandFilename = null;
     String passwordFilename = null;
     Process p = null;
-    StreamHandlerFactory streamHandler = null;
+    AsyncSink sink = null;
     PerfCounters counters = new PerfCounters();
 
     try {
@@ -395,19 +382,21 @@
         LOG.debug("  " + arg);
       }
 
-      // This writer will be closed by StreamHandlerFactory.
-      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+      // This writer will be closed by AsyncSink.
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+          options.getConf(), options, tableName);
 
       // Actually start the psql dump.
-      p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));
+      p = Runtime.getRuntime().exec(args.toArray(new String[0]),
+          envp.toArray(new String[0]));
 
       // read from the stdout pipe into the HDFS writer.
       InputStream is = p.getInputStream();
-      streamHandler = new PostgresqlStreamHandlerFactory(w, options, counters);
+      sink = new PostgresqlAsyncSink(w, options, counters);
 
-      LOG.debug("Starting stream handler");
+      LOG.debug("Starting stream sink");
       counters.startClock();
-      streamHandler.processStream(is);
+      sink.processStream(is);
     } finally {
       // block until the process is done.
       LOG.debug("Waiting for process completion");
@@ -440,12 +429,12 @@
         }
       }
 
-      // block until the stream handler is done too.
+      // block until the stream sink is done too.
       int streamResult = 0;
-      if (null != streamHandler) {
+      if (null != sink) {
         while (true) {
           try {
-            streamResult = streamHandler.join();
+            streamResult = sink.join();
           } catch (InterruptedException ie) {
             // interrupted; loop around.
             continue;
@@ -463,7 +452,7 @@
       }
 
       if (0 != streamResult) {
-        throw new IOException("Encountered exception in stream handler");
+        throw new IOException("Encountered exception in stream sink");
       }
 
       counters.stopClock();

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Wed Nov  4 22:50:23 2009
@@ -30,9 +30,6 @@
  * Database manager that is connects to a generic JDBC-compliant
  * database; its constructor is parameterized on the JDBC Driver
  * class to load.
- *
- * 
- *
  */
 public class GenericJdbcManager extends SqlManager {
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java Wed Nov  4 22:50:23 2009
@@ -27,7 +27,7 @@
  * Manages connections to hsqldb databases.
  * Extends generic SQL manager.
  */
-public class HsqldbManager extends GenericJdbcManager implements ConnManager {
+public class HsqldbManager extends GenericJdbcManager {
 
   public static final Log LOG = LogFactory.getLog(HsqldbManager.class.getName());
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Wed Nov  4 22:50:23 2009
@@ -27,28 +27,24 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.nio.CharBuffer;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
 import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.sqoop.util.AsyncSink;
 
 /**
  * Manages direct connections to MySQL databases
@@ -58,57 +54,42 @@
 
   public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
 
-  // StreamHandlers used to import data from mysqldump directly into HDFS.
+  // AsyncSinks used to import data from mysqldump directly into HDFS.
 
   /**
    * Copies data directly from mysqldump into HDFS, after stripping some
    * header and footer characters that are attached to each line in mysqldump.
    */
-  static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
+  static class CopyingAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
     private final PerfCounters counters;
 
-    CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
+    CopyingAsyncSink(final SplittableBufferedWriter w,
+        final PerfCounters ctrs) {
       this.writer = w;
       this.counters = ctrs;
     }
 
-    private CopyingStreamThread child;
-
     public void processStream(InputStream is) {
       child = new CopyingStreamThread(is, writer, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    private static class CopyingStreamThread extends Thread {
-      public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+    private static class CopyingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          CopyingStreamThread.class.getName());
 
       private final SplittableBufferedWriter writer;
       private final InputStream stream;
       private final PerfCounters counters;
 
-      private boolean error;
-
-      CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
-          final PerfCounters ctrs) {
+      CopyingStreamThread(final InputStream is,
+          final SplittableBufferedWriter w, final PerfCounters ctrs) {
         this.writer = w;
         this.stream = is;
         this.counters = ctrs;
       }
 
-      public boolean isErrored() {
-        return error;
-      }
-
       public void run() {
         BufferedReader r = null;
         SplittableBufferedWriter w = this.writer;
@@ -143,7 +124,7 @@
         } catch (IOException ioe) {
           LOG.error("IOException reading from mysqldump: " + ioe.toString());
           // flag this error so we get an error status back in the caller.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -167,49 +148,38 @@
 
 
   /**
-   * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
    * output, and re-emit the text in the user's specified output format.
    */
-  static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
+  static class ReparsingAsyncSink extends ErrorableAsyncSink {
     private final SplittableBufferedWriter writer;
     private final ImportOptions options;
     private final PerfCounters counters;
 
-    ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
-        final PerfCounters ctrs) {
+    ReparsingAsyncSink(final SplittableBufferedWriter w,
+        final ImportOptions opts, final PerfCounters ctrs) {
       this.writer = w;
       this.options = opts;
       this.counters = ctrs;
     }
 
-    private ReparsingStreamThread child;
-
     public void processStream(InputStream is) {
       child = new ReparsingStreamThread(is, writer, options, counters);
       child.start();
     }
 
-    public int join() throws InterruptedException {
-      child.join();
-      if (child.isErrored()) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-
-    private static class ReparsingStreamThread extends Thread {
-      public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
+    private static class ReparsingStreamThread extends ErrorableThread {
+      public static final Log LOG = LogFactory.getLog(
+          ReparsingStreamThread.class.getName());
 
       private final SplittableBufferedWriter writer;
       private final ImportOptions options;
       private final InputStream stream;
       private final PerfCounters counters;
 
-      private boolean error;
-
-      ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
-          final ImportOptions opts, final PerfCounters ctrs) {
+      ReparsingStreamThread(final InputStream is,
+          final SplittableBufferedWriter w, final ImportOptions opts,
+          final PerfCounters ctrs) {
         this.writer = w;
         this.options = opts;
         this.stream = is;
@@ -226,12 +196,9 @@
 
       static {
         // build a record parser for mysqldump's format
-        MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
-            MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
-      }
-
-      public boolean isErrored() {
-        return error;
+        MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM,
+            MYSQL_RECORD_DELIM, MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR,
+            MYSQL_ENCLOSE_REQUIRED);
       }
 
       public void run() {
@@ -300,7 +267,7 @@
         } catch (IOException ioe) {
           LOG.error("IOException reading from mysqldump: " + ioe.toString());
           // flag this error so the parent can handle it appropriately.
-          error = true;
+          setError();
         } finally {
           if (null != r) {
             try {
@@ -374,9 +341,13 @@
    * Import the table into HDFS by using mysqldump to pull out the data from
    * the database and upload the files directly to HDFS.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
 
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
+
     LOG.info("Beginning mysqldump fast path import");
 
     if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -406,7 +377,7 @@
     String passwordFile = null;
 
     Process p = null;
-    StreamHandlerFactory streamHandler = null;
+    AsyncSink sink = null;
     PerfCounters counters = new PerfCounters();
     try {
       // --defaults-file must be the first argument.
@@ -446,8 +417,9 @@
         LOG.debug("  " + arg);
       }
 
-      // This writer will be closed by StreamHandlerFactory.
-      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+      // This writer will be closed by AsyncSink.
+      SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+          options.getConf(), options, tableName);
 
       // Actually start the mysqldump.
       p = Runtime.getRuntime().exec(args.toArray(new String[0]));
@@ -457,19 +429,19 @@
 
       if (outputDelimsAreMySQL()) {
         LOG.debug("Output delimiters conform to mysqldump; using straight copy"); 
-        streamHandler = new CopyingStreamHandlerFactory(w, counters);
+        sink = new CopyingAsyncSink(w, counters);
       } else {
         LOG.debug("User-specified delimiters; using reparsing import");
         LOG.info("Converting data to use specified delimiters.");
         LOG.info("(For the fastest possible import, use");
         LOG.info("--mysql-delimiters to specify the same field");
         LOG.info("delimiters as are used by mysqldump.)");
-        streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
+        sink = new ReparsingAsyncSink(w, options, counters);
       }
 
       // Start an async thread to read and upload the whole stream.
       counters.startClock();
-      streamHandler.processStream(is);
+      sink.processStream(is);
     } finally {
 
       // block until the process is done.
@@ -495,12 +467,12 @@
         }
       }
 
-      // block until the stream handler is done too.
+      // block until the stream sink is done too.
       int streamResult = 0;
-      if (null != streamHandler) {
+      if (null != sink) {
         while (true) {
           try {
-            streamResult = streamHandler.join();
+            streamResult = sink.join();
           } catch (InterruptedException ie) {
             // interrupted; loop around.
             continue;
@@ -518,7 +490,7 @@
       }
 
       if (0 != streamResult) {
-        throw new IOException("Encountered exception in stream handler");
+        throw new IOException("Encountered exception in stream sink");
       }
 
       counters.stopClock();

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java Wed Nov  4 22:50:23 2009
@@ -27,7 +27,7 @@
  * calls the accept() method of each ManagerFactory, in order until
  * one such call returns a non-null ConnManager instance.
  */
-public interface ManagerFactory {
-  ConnManager accept(ImportOptions options);
+public abstract class ManagerFactory {
+  public abstract ConnManager accept(ImportOptions options);
 }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Wed Nov  4 22:50:23 2009
@@ -92,13 +92,13 @@
   }
 
   @Override
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
         throws IOException, ImportError {
 
     // Check that we're not doing a MapReduce from localhost. If we are, point
     // out that we could use mysqldump.
     if (!MySQLManager.warningPrinted) {
-      String connectString = options.getConnectString();
+      String connectString = context.getOptions().getConnectString();
 
       if (null != connectString && connectString.indexOf("//localhost") != -1) {
         // if we're not doing a remote connection, they should have a LocalMySQLManager.
@@ -114,7 +114,7 @@
     }
 
     // Then run the normal importTable() method.
-    super.importTable(tableName, jarFile, conf);
+    super.importTable(context);
   }
 
   /**

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Wed Nov  4 22:50:23 2009
@@ -90,8 +90,12 @@
    * This importTable() implementation continues to use the older DBInputFormat
    * because DataDrivenDBInputFormat does not currently work with Oracle.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
+
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
     ImportJob importer = new ImportJob(options);
     String splitCol = options.getSplitByCol();
     if (null == splitCol) {
@@ -105,7 +109,7 @@
           + ". Please specify one with --split-by.");
     }
 
-    importer.runImport(tableName, jarFile, splitCol, conf);
+    importer.runImport(tableName, jarFile, splitCol, options.getConf());
   }
 }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java Wed Nov  4 22:50:23 2009
@@ -71,13 +71,13 @@
   }
 
   @Override
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
         throws IOException, ImportError {
 
     // The user probably should have requested --direct to invoke pg_dump.
     // Display a warning informing them of this fact.
     if (!PostgresqlManager.warningPrinted) {
-      String connectString = options.getConnectString();
+      String connectString = context.getOptions().getConnectString();
 
       LOG.warn("It looks like you are importing from postgresql.");
       LOG.warn("This transfer can be faster! Use the --direct");
@@ -87,7 +87,7 @@
     }
 
     // Then run the normal importTable() method.
-    super.importTable(tableName, jarFile, conf);
+    super.importTable(context);
   }
 
   @Override

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Wed Nov  4 22:50:23 2009
@@ -45,7 +45,7 @@
  * This is an abstract class; it requires a database-specific
  * ConnManager implementation to actually create the connection.
  */
-public abstract class SqlManager implements ConnManager {
+public abstract class SqlManager extends ConnManager {
 
   public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
 
@@ -260,8 +260,11 @@
    * Default implementation of importTable() is to launch a MapReduce job
    * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
    */
-  public void importTable(String tableName, String jarFile, Configuration conf)
+  public void importTable(ImportJobContext context)
       throws IOException, ImportError {
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    ImportOptions options = context.getOptions();
     DataDrivenImportJob importer = new DataDrivenImportJob(options);
     String splitCol = options.getSplitByCol();
     if (null == splitCol) {
@@ -275,7 +278,7 @@
           + ". Please specify one with --split-by.");
     }
 
-    importer.runImport(tableName, jarFile, splitCol, conf);
+    importer.runImport(tableName, jarFile, splitCol, options.getConf());
   }
 
   /**

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Wed Nov  4 22:50:23 2009
@@ -37,7 +37,8 @@
  */
 public final class DirectImportUtils {
 
-  public static final Log LOG = LogFactory.getLog(DirectImportUtils.class.getName());
+  public static final Log LOG = LogFactory.getLog(
+      DirectImportUtils.class.getName());
 
   private DirectImportUtils() {
   }
@@ -47,7 +48,8 @@
    * which may be e.g. "a+x" or "0600", etc.
    * @throws IOException if chmod failed.
    */
-  public static void setFilePermissions(File file, String modstr) throws IOException {
+  public static void setFilePermissions(File file, String modstr)
+      throws IOException {
     // Set this file to be 0600. Java doesn't have a built-in mechanism for this
     // so we need to go out to the shell to execute chmod.
     try {
@@ -61,9 +63,9 @@
 
   /**
    * Open a file in HDFS for write to hold the data associated with a table.
-   * Creates any necessary directories, and returns the OutputStream to write to.
-   * The caller is responsible for calling the close() method on the returned
-   * stream.
+   * Creates any necessary directories, and returns the OutputStream to write
+   * to. The caller is responsible for calling the close() method on the
+   * returned stream.
    */
   public static SplittableBufferedWriter createHdfsSink(Configuration conf,
       ImportOptions options, String tableName) throws IOException {
@@ -83,8 +85,8 @@
 
     // This Writer will be closed by the caller.
     return new SplittableBufferedWriter(
-        new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
-        options.shouldUseCompression()));
+        new SplittingOutputStream(conf, destDir, "data-",
+        options.getDirectSplitSize(), options.shouldUseCompression()));
   }
 }
 

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java Wed Nov  4 22:50:23 2009
@@ -39,49 +39,49 @@
   }
 
   /**
-   * Execute a program defined by the args array with default stream handlers
+   * Execute a program defined by the args array with default stream sinks
    * that consume the program's output (to prevent it from blocking on buffers)
    * and then ignore said output.
    */
   public static int exec(String [] args) throws IOException {
-    NullStreamHandlerFactory f = new NullStreamHandlerFactory();
-    return exec(args, f, f);
+    NullAsyncSink s = new NullAsyncSink();
+    return exec(args, s, s);
   }
 
   /**
    * Run a command via Runtime.exec(), with its stdout and stderr streams
-   * directed to be handled by threads generated by StreamHandlerFactories.
+   * directed to be handled by threads generated by AsyncSinks.
    * Block until the child process terminates. 
    *
    * @return the exit status of the ran program
    */
-  public static int exec(String [] args, StreamHandlerFactory outHandler,
-      StreamHandlerFactory errHandler) throws IOException {
-    return exec(args, null, outHandler, errHandler);
+  public static int exec(String [] args, AsyncSink outSink,
+      AsyncSink errSink) throws IOException {
+    return exec(args, null, outSink, errSink);
   }
 
 
   /**
    * Run a command via Runtime.exec(), with its stdout and stderr streams
-   * directed to be handled by threads generated by StreamHandlerFactories.
+   * directed to be handled by threads generated by AsyncSinks.
    * Block until the child process terminates. Allows the programmer to
    * specify an environment for the child program.
    *
    * @return the exit status of the ran program
    */
-  public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
-      StreamHandlerFactory errHandler) throws IOException {
+  public static int exec(String [] args, String [] envp, AsyncSink outSink,
+      AsyncSink errSink) throws IOException {
 
     // launch the process.
     Process p = Runtime.getRuntime().exec(args, envp);
 
-    // dispatch its stdout and stderr to stream handlers if available.
-    if (null != outHandler) {
-      outHandler.processStream(p.getInputStream());
+    // dispatch its stdout and stderr to stream sinks if available.
+    if (null != outSink) {
+      outSink.processStream(p.getInputStream());
     } 
 
-    if (null != errHandler) {
-      errHandler.processStream(p.getErrorStream());
+    if (null != errSink) {
+      errSink.processStream(p.getErrorStream());
     }
 
     // wait for the return value.

Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java Wed Nov  4 22:50:23 2009
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
 import org.apache.hadoop.sqoop.manager.ManagerFactory;
 
 import junit.framework.TestCase;
@@ -75,14 +76,14 @@
 
   ////// mock classes used for test cases above //////
 
-  public static class AlwaysDummyFactory implements ManagerFactory {
+  public static class AlwaysDummyFactory extends ManagerFactory {
     public ConnManager accept(ImportOptions opts) {
       // Always return a new DummyManager
       return new DummyManager();
     }
   }
 
-  public static class EmptyFactory implements ManagerFactory {
+  public static class EmptyFactory extends ManagerFactory {
     public ConnManager accept(ImportOptions opts) {
       // Never instantiate a proper ConnManager;
       return null;
@@ -92,7 +93,7 @@
   /**
    * This implementation doesn't do anything special.
    */
-  public static class DummyManager implements ConnManager {
+  public static class DummyManager extends ConnManager {
     public void close() {
     }
 
@@ -131,7 +132,7 @@
     public void execAndPrint(String s) {
     }
 
-    public void importTable(String tableName, String jarFile, Configuration conf) {
+    public void importTable(ImportJobContext context) {
     }
   }
 }

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
 /hadoop/core/trunk/src/contrib/streaming:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/streaming:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/streaming:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/vaidya/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112
 /hadoop/core/trunk/src/contrib/vaidya:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/vaidya:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/vaidya:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml Wed Nov  4 22:50:23 2009
@@ -317,6 +317,36 @@
       </section>
 
       <section>
+        <title>Copying to S3</title>
+
+        <p>DistCp can be used to copy data between HDFS and other filesystems,
+        including those backed by S3. The <code>s3n</code> FileSystem
+        implementation allows DistCp (and Hadoop in general) to use an S3
+        bucket as a source or target for transfers. To transfer data from
+        HDFS to an S3 bucket, invoke DistCp using arguments like the following:
+        </p>
+<source>
+bash$ hadoop distcp hdfs://nn:8020/foo/bar \
+    s3n://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@&lt;bucket&gt;/foo/bar
+</source>
+
+        <p><code>$AWS_ACCESS_KEY_ID</code> and
+        <code>$AWS_SECRET_ACCESS_KEY</code> are environment variables holding
+        S3 access credentials.</p>
+
+        <p>Some FileSystem operations take longer on S3 than on HDFS. If you
+        are transferring large files to S3 (e.g., 1 GB and up), you may
+        experience timeouts during your job. To prevent this, you should set
+        the task timeout to a larger interval than is typically used:
+        </p>
+<source>
+bash$ hadoop distcp -D mapred.task.timeout=1800000 \
+    hdfs://nn:8020/foo/bar \
+    s3n://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@&lt;bucket&gt;/foo/bar
+</source>
+      </section>
+
+      <section>
         <title>MapReduce and Other Side-effects</title>
 
         <p>As has been mentioned in the preceding, should a map fail to copy

Propchange: hadoop/mapreduce/branches/HDFS-641/src/examples/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/examples:713112
 /hadoop/core/trunk/src/examples:776175-784663
-/hadoop/mapreduce/trunk/src/examples:817878-830225
+/hadoop/mapreduce/trunk/src/examples:817878-832891

Propchange: hadoop/mapreduce/branches/HDFS-641/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/java:713112
 /hadoop/core/trunk/src/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/java:817878-830225
+/hadoop/mapreduce/trunk/src/java:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Nov  4 22:50:23 2009
@@ -1541,7 +1541,6 @@
       name = TaskType.JOB_CLEANUP;
     } else if (tip.isMapTask()) {
       ++runningMapTasks;
-      metrics.addRunningMaps(jobId, 1);
       name = TaskType.MAP;
       counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
@@ -1553,7 +1552,6 @@
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
-      metrics.addRunningReduces(jobId, 1);
       name = TaskType.REDUCE;
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
       if (tip.isSpeculating()) {
@@ -2059,7 +2057,9 @@
     String taskTrackerName = tts.getTrackerName();
     String taskTrackerHost = tts.getHost();
     if (numMapTasks == 0) {
-      LOG.info("No maps to schedule for " + profile.getJobID());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("No maps to schedule for " + profile.getJobID());
+      }
       return -1;
     }
 
@@ -2249,7 +2249,9 @@
     String taskTrackerName = tts.getTrackerName();
     String taskTrackerHost = tts.getHost();
     if (numReduceTasks == 0) {
-      LOG.info("No reduces to schedule for " + profile.getJobID());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("No reduces to schedule for " + profile.getJobID());
+      }
       return -1;
     }
     TaskInProgress tip = null;
@@ -2604,7 +2606,6 @@
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
-      metrics.decRunningMaps(jobId, 1);
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
       if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
@@ -2617,7 +2618,6 @@
       }
     } else {
       runningReduceTasks -= 1;
-      metrics.decRunningReduces(jobId, 1);
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
       if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
@@ -2966,7 +2966,6 @@
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
-        metrics.decRunningMaps(jobId, 1);
         metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
@@ -2976,7 +2975,6 @@
         }
       } else {
         runningReduceTasks -= 1;
-        metrics.decRunningReduces(jobId, 1);
         metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Nov  4 22:50:23 2009
@@ -420,15 +420,7 @@
               if ((now - newProfile.getLastSeen()) >
                   tasktrackerExpiryInterval) {
                 // Remove completely after marking the tasks as 'KILLED'
-                lostTaskTracker(current);
-                // tracker is lost, and if it is blacklisted, remove 
-                // it from the count of blacklisted trackers in the cluster
-                if (isBlacklisted(trackerName)) {
-                  faultyTrackers.decrBlackListedTrackers(1);
-                }
-                updateTaskTrackerStatus(trackerName, null);
-                statistics.taskTrackerRemoved(trackerName);
-                getInstrumentation().decTrackers(1);
+                removeTracker(current);
                 // remove the mapping from the hosts list
                 String hostname = newProfile.getHost();
                 hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -443,6 +435,19 @@
     }
   }
 
+  private void removeTracker(TaskTracker tracker) {
+    lostTaskTracker(tracker);
+    String trackerName = tracker.getStatus().getTrackerName();
+    // tracker is lost, and if it is blacklisted, remove 
+    // it from the count of blacklisted trackers in the cluster
+    if (isBlacklisted(trackerName)) {
+      faultyTrackers.decrBlackListedTrackers(1);
+    }
+    updateTaskTrackerStatus(trackerName, null);
+    statistics.taskTrackerRemoved(trackerName);
+    getInstrumentation().decTrackers(1);
+  }
+
   public synchronized void retireJob(JobID jobid, String historyFile) {
     synchronized (jobs) {
       JobInProgress job = jobs.get(jobid);
@@ -2439,6 +2444,8 @@
       totalReduces -= oldStatus.countReduceTasks();
       occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
       occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+      getInstrumentation().decRunningMaps(oldStatus.countMapTasks());
+      getInstrumentation().decRunningReduces(oldStatus.countReduceTasks());
       getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
       getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -2465,6 +2472,8 @@
       totalReduces += status.countReduceTasks();
       occupiedMapSlots += status.countOccupiedMapSlots();
       occupiedReduceSlots += status.countOccupiedReduceSlots();
+      getInstrumentation().addRunningMaps(status.countMapTasks());
+      getInstrumentation().addRunningReduces(status.countReduceTasks());
       getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
       getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
@@ -3824,9 +3833,7 @@
             for (TaskTracker tracker : trackers) {
               LOG.info("Decommission: Losing tracker " + tracker + 
                        " on host " + host);
-              lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(
-                tracker.getStatus().getTrackerName(), null);
+              removeTracker(tracker);
             }
             trackersDecommissioned += trackers.size();
           }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Wed Nov  4 22:50:23 2009
@@ -127,16 +127,16 @@
   public void decRunningJob(JobConf conf, JobID id) 
   { }
 
-  public void addRunningMaps(JobID id, int task)
+  public void addRunningMaps(int tasks)
   { }
 
-  public void decRunningMaps(JobID id, int task) 
+  public void decRunningMaps(int tasks) 
   { }
 
-  public void addRunningReduces(JobID id, int task)
+  public void addRunningReduces(int tasks)
   { }
 
-  public void decRunningReduces(JobID id, int task)
+  public void decRunningReduces(int tasks)
   { }
 
   public void killedMap(TaskAttemptID taskAttemptID)

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Wed Nov  4 22:50:23 2009
@@ -341,25 +341,25 @@
   }
 
   @Override
-  public synchronized void addRunningMaps(JobID id, int task)
+  public synchronized void addRunningMaps(int task)
   {
     numRunningMaps += task;
   }
 
   @Override
-  public synchronized void decRunningMaps(JobID id, int task) 
+  public synchronized void decRunningMaps(int task) 
   {
     numRunningMaps -= task;
   }
 
   @Override
-  public synchronized void addRunningReduces(JobID id, int task)
+  public synchronized void addRunningReduces(int task)
   {
     numRunningReduces += task;
   }
 
   @Override
-  public synchronized void decRunningReduces(JobID id, int task)
+  public synchronized void decRunningReduces(int task)
   {
     numRunningReduces -= task;
   }

Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java Wed Nov  4 22:50:23 2009
@@ -56,9 +56,11 @@
       completedMapsInputSize+=(tip.getMapInputSize()+1);
       completedMapsOutputSize+=ts.getOutputSize();
 
-      LOG.info("completedMapsUpdates:"+completedMapsUpdates+"  "+
-          "completedMapsInputSize:"+completedMapsInputSize+"  " +
-        "completedMapsOutputSize:"+completedMapsOutputSize);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("completedMapsUpdates:"+completedMapsUpdates+"  "+
+                  "completedMapsInputSize:"+completedMapsInputSize+"  " +
+                  "completedMapsOutputSize:"+completedMapsOutputSize);
+      }
     }
   }
 
@@ -73,7 +75,9 @@
       //add desiredMaps() so that randomwriter case doesn't blow up
       long estimate = Math.round((inputSize * 
           completedMapsOutputSize * 2.0)/completedMapsInputSize);
-      LOG.debug("estimate total map output will be " + estimate);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("estimate total map output will be " + estimate);
+      }
       return estimate;
     }
   }

Propchange: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
 /hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:817878-830225
+/hadoop/mapreduce/trunk/src/test/mapred:817878-832891

Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Wed Nov  4 22:50:23 2009
@@ -129,11 +129,6 @@
     taskAttemptID[1] = job.findMapTask(trackers[1]);
     taskAttemptID[2] = job.findReduceTask(trackers[2]);
     
-    assertTrue("Mismatch in num  running maps",
-        mi.numRunningMaps == numMaps);
-    assertTrue("Mismatch in num running reduces",
-        mi.numRunningReduces == numReds);
-
     job.finishTask(taskAttemptID[0]);
     job.finishTask(taskAttemptID[1]);
     job.finishTask(taskAttemptID[2]);
@@ -254,6 +249,10 @@
       mapSlotsPerTask+mapSlotsPerTask1, mi.numOccupiedMapSlots);
     assertEquals("Mismatch in reduce slots occupied",
       reduceSlotsPerTask+reduceSlotsPerTask1, mi.numOccupiedReduceSlots);
+    assertEquals("Mismatch in num  running maps",
+        2, mi.numRunningMaps);
+      assertEquals("Mismatch in num running reduces",
+        2, mi.numRunningReduces);
     
     //now send heartbeat with no running tasks
     status = new TaskTrackerStatus[1];
@@ -265,6 +264,10 @@
       0, mi.numOccupiedMapSlots);
     assertEquals("Mismatch in reduce slots occupied",
       0, mi.numOccupiedReduceSlots);
+    assertEquals("Mismatch in num  running maps",
+      0, mi.numRunningMaps);
+    assertEquals("Mismatch in num running reduces",
+      0, mi.numRunningReduces);
   }
 
   public void testReservedSlots() throws IOException {
@@ -295,6 +298,18 @@
   }
   
   public void testDecomissionedTrackers() throws IOException {
+    // create TaskTrackerStatus and send heartbeats
+    TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+    status[0] = getTTStatus(trackers[0], new ArrayList<TaskStatus>());
+    status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+    status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+    for (int i = 0; i< trackers.length; i++) {
+      FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false,
+          false, trackers[i], responseId);
+    }
+    
+    assertEquals("Mismatch in number of trackers",
+        trackers.length, mi.numTrackers);
     Set<String> dHosts = new HashSet<String>();
     dHosts.add(hosts[1]);
     assertEquals("Mismatch in number of decommissioned trackers",
@@ -302,6 +317,8 @@
     jobTracker.decommissionNodes(dHosts);
     assertEquals("Mismatch in number of decommissioned trackers",
         1, mi.numTrackersDecommissioned);
+    assertEquals("Mismatch in number of trackers",
+        trackers.length - 1, mi.numTrackers);
   }
   
   static class FakeTaskScheduler extends JobQueueTaskScheduler {
@@ -555,25 +572,25 @@
     }
 
     @Override
-    public synchronized void addRunningMaps(JobID id, int task)
+    public synchronized void addRunningMaps(int task)
     {
       numRunningMaps += task;
     }
 
     @Override
-    public synchronized void decRunningMaps(JobID id, int task) 
+    public synchronized void decRunningMaps(int task) 
     {
       numRunningMaps -= task;
     }
 
     @Override
-    public synchronized void addRunningReduces(JobID id, int task)
+    public synchronized void addRunningReduces(int task)
     {
       numRunningReduces += task;
     }
 
     @Override
-    public synchronized void decRunningReduces(JobID id, int task)
+    public synchronized void decRunningReduces(int task)
     {
       numRunningReduces -= task;
     }

Propchange: hadoop/mapreduce/branches/HDFS-641/src/webapps/job/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  4 22:50:23 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112
 /hadoop/core/trunk/src/webapps/job:776175-785643
-/hadoop/mapreduce/trunk/src/webapps/job:817878-830225
+/hadoop/mapreduce/trunk/src/webapps/job:817878-832891



Mime
View raw message