Author: omalley
Date: Wed Nov 4 22:50:23 2009
New Revision: 832892
URL: http://svn.apache.org/viewvc?rev=832892&view=rev
Log:
Updated merge from the trunk
Added:
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/api-reference.txt
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/doc/api-reference.txt
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/CountingOutputStream.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/HdfsSplitOutputStream.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ImportJobContext.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/AsyncSink.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableAsyncSink.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ErrorableThread.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingAsyncSink.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
- copied unchanged from r832891, hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullAsyncSink.java
Removed:
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
Modified:
hadoop/mapreduce/branches/HDFS-641/ (props changed)
hadoop/mapreduce/branches/HDFS-641/.gitignore (props changed)
hadoop/mapreduce/branches/HDFS-641/CHANGES.txt
hadoop/mapreduce/branches/HDFS-641/conf/ (props changed)
hadoop/mapreduce/branches/HDFS-641/conf/capacity-scheduler.xml.template (props changed)
hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-0.22.0-dev.jar
hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-test-0.22.0-dev.jar
hadoop/mapreduce/branches/HDFS-641/src/c++/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/build-contrib.xml (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/build.xml (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/data_join/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/dynamic-scheduler/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/eclipse-plugin/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/index/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/mrunit/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
hadoop/mapreduce/branches/HDFS-641/src/contrib/streaming/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/contrib/vaidya/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml
hadoop/mapreduce/branches/HDFS-641/src/examples/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/java/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java
hadoop/mapreduce/branches/HDFS-641/src/test/mapred/ (props changed)
hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
hadoop/mapreduce/branches/HDFS-641/src/webapps/job/ (props changed)
Propchange: hadoop/mapreduce/branches/HDFS-641/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19/mapred:713112
-/hadoop/mapreduce/trunk:817878-830225
+/hadoop/mapreduce/trunk:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/.gitignore
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,4 +1,4 @@
/hadoop/core/branches/HADOOP-4687/mapred/.gitignore:776175-784965
/hadoop/core/branches/branch-0.19/mapred/.gitignore:713112
/hadoop/core/trunk/.gitignore:784664-785643
-/hadoop/mapreduce/trunk/.gitignore:817878-830225
+/hadoop/mapreduce/trunk/.gitignore:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/CHANGES.txt?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/HDFS-641/CHANGES.txt Wed Nov 4 22:50:23 2009
@@ -28,6 +28,11 @@
MAPREDUCE-1090. Modified log statement in TaskMemoryManagerThread to
include task attempt id. (yhemanth)
+ MAPREDUCE-1069. Implement Sqoop API refactoring. (Aaron Kimball via
+ tomwhite)
+
+ MAPREDUCE-1036. Document Sqoop API. (Aaron Kimball via cdouglas)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
@@ -37,6 +42,9 @@
BUG FIXES
+ MAPREDUCE-1089. Fix NPE in fair scheduler preemption when tasks are
+ scheduled but not running. (Todd Lipcon via matei)
+
MAPREDUCE-1014. Fix the libraries for common and hdfs. (omalley)
MAPREDUCE-1111. JT Jetty UI not working if we run mumak.sh
@@ -48,6 +56,16 @@
MAPREDUCE-1098. Fixed the distributed-cache to not do i/o while holding a
global lock. (Amareshwari Sriramadasu via acmurthy)
+ MAPREDUCE-1158. Fix JT running maps and running reduces metrics.
+ (sharad)
+
+ MAPREDUCE-1160. Reduce verbosity of log lines in some Map/Reduce classes
+ to avoid filling up jobtracker logs on a busy cluster.
+ (Ravi Gummadi and Hong Tang via yhemanth)
+
+ MAPREDUCE-1153. Fix tasktracker metrics when trackers are decommissioned.
+ (sharad)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
@@ -472,6 +490,9 @@
MAPREDUCE-1012. Mark Context interfaces as public evolving. (Tom White via
cdouglas)
+ MAPREDUCE-971. Document use of distcp when copying to s3, managing timeouts
+ in particular. (Aaron Kimball via cdouglas)
+
BUG FIXES
MAPREDUCE-878. Rename fair scheduler design doc to
@@ -826,3 +847,6 @@
queue capacity. (Rahul Kumar Singh via yhemanth)
MAPREDUCE-1016. Make the job history log format JSON. (cutting)
+
+ MAPREDUCE-1038. Weave Mumak aspects only if related files have changed.
+ (Aaron Kimball via cdouglas)
Propchange: hadoop/mapreduce/branches/HDFS-641/conf/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/conf:713112
/hadoop/core/trunk/conf:784664-785643
-/hadoop/mapreduce/trunk/conf:817878-830225
+/hadoop/mapreduce/trunk/conf:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/conf/capacity-scheduler.xml.template
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/conf/capacity-scheduler.xml.template:713112
/hadoop/core/trunk/conf/capacity-scheduler.xml.template:776175-785643
-/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template:817878-830225
+/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-0.22.0-dev.jar?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
Binary files - no diff available.
Modified: hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-test-0.22.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/lib/hadoop-core-test-0.22.0-dev.jar?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
Binary files - no diff available.
Propchange: hadoop/mapreduce/branches/HDFS-641/src/c++/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/c++:713112
/hadoop/core/trunk/src/c++:776175-784663
-/hadoop/mapreduce/trunk/src/c++:817878-830225
+/hadoop/mapreduce/trunk/src/c++:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib:713112
/hadoop/core/trunk/src/contrib:784664-785643
-/hadoop/mapreduce/trunk/src/contrib:817878-830225
+/hadoop/mapreduce/trunk/src/contrib:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
/hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
/hadoop/core/trunk/src/contrib/build.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build.xml:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/build.xml:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
/hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
/hadoop/core/trunk/src/contrib/data_join:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/data_join:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/data_join:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
/hadoop/core/trunk/src/contrib/eclipse-plugin:776175-784663
-/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
/hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/fairscheduler:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Nov 4 22:50:23 2009
@@ -831,7 +831,11 @@
List<TaskStatus> statuses = new ArrayList<TaskStatus>();
for (TaskInProgress tip: tips) {
for (TaskAttemptID id: tip.getActiveTasks().keySet()) {
- statuses.add(tip.getTaskStatus(id));
+ TaskStatus stat = tip.getTaskStatus(id);
+ // status is null when the task has been scheduled but not yet running
+ if (stat != null) {
+ statuses.add(stat);
+ }
}
}
return statuses;
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
/hadoop/core/trunk/src/contrib/index:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/index:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/index:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
/hadoop/core/trunk/src/contrib/mrunit:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/mrunit:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/mrunit:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/mumak/build.xml Wed Nov 4 22:50:23 2009
@@ -30,6 +30,7 @@
<property name="javac.version" value="1.6"/>
<property name="javac.args.warnings" value="-Xlint:unchecked"/>
<import file="../build-contrib.xml"/>
+ <property name="mumak.stamp.file" value="${build.dir}/mumak.uptodate.stamp"/>
<target name="compile-java-sources" depends="init, ivy-retrieve-common" unless="skip.contrib">
<echo message="contrib: ${name}"/>
@@ -68,6 +69,7 @@
deprecation="${javac.deprecation}">
<classpath refid="contrib-classpath"/>
</iajc>
+ <touch file="${mumak.stamp.file}" mkdirs="true" />
<echo message="Weaving of aspects is finished"/>
</target>
@@ -94,7 +96,7 @@
<include name="*.jar"/>
</fileset>
</copy>
- <exec executable="sed">
+ <exec executable="sed">
<arg value="-i"/>
<arg value="-e"/>
<arg value="s/^HADOOP_VERSION=/HADOOP_VERSION=${version}/"/>
@@ -104,14 +106,13 @@
</target>
<target name="check.aspects">
- <uptodate property="build.unnecessary"
- targetfile="${build.dir}/${dest.jar}" >
- <srcfiles dir="${hadoop.root}/src/java/"
- includes="org/apache/hadoop/**/*.java" />
- <srcfiles dir="${hadoop.root}/build/src/"
- includes="org/apache/hadoop/**/*.java" />
- <srcfiles dir="${src.dir}"
- includes="org/apache/hadoop/**/*.java, org/apache/hadoop/**/*.aj" />
+ <uptodate property="build.unnecessary"
+ targetfile="${mumak.stamp.file}">
+ <srcfiles dir="${hadoop.root}/src/java/"
+ includes="org/apache/hadoop/**/*.java" />
+ <srcfiles dir="${hadoop.root}/src/webapps/"
+ includes="**/*.jsp" />
+ <srcfiles dir="${src.dir}" includes="org/apache/hadoop/**/*.aj" />
</uptodate>
</target>
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
/hadoop/core/trunk/src/contrib/sqoop:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/sqoop:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/sqoop:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/doc/SqoopUserGuide.txt Wed Nov 4 22:50:23 2009
@@ -61,3 +61,5 @@
include::supported-dbs.txt[]
+include::api-reference.txt[]
+
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Wed Nov 4 22:50:23 2009
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
/**
@@ -117,6 +118,8 @@
private boolean areDelimsManuallySet;
+ private Configuration conf;
+
public static final int DEFAULT_NUM_MAPPERS = 4;
private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
@@ -249,6 +252,8 @@
this.useCompression = false;
this.directSplitSize = 0;
+ this.conf = new Configuration();
+
loadFromProperties();
}
@@ -869,4 +874,12 @@
public long getDirectSplitSize() {
return this.directSplitSize;
}
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration config) {
+ this.conf = config;
+ }
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Wed Nov 4 22:50:23 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.sqoop.hive.HiveImport;
import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
import org.apache.hadoop.sqoop.orm.ClassWriter;
import org.apache.hadoop.sqoop.orm.CompilationManager;
import org.apache.hadoop.sqoop.util.ImportError;
@@ -88,7 +89,8 @@
if (options.getAction() == ImportOptions.ControlAction.FullImport) {
// Proceed onward to do the import.
- manager.importTable(tableName, jarFile, getConf());
+ ImportJobContext context = new ImportJobContext(tableName, jarFile, options);
+ manager.importTable(context);
// If the user wants this table to be in Hive, perform that post-load.
if (options.doHiveImport()) {
@@ -103,6 +105,7 @@
*/
public int run(String [] args) {
options = new ImportOptions();
+ options.setConf(getConf());
try {
options.parse(args);
options.validate();
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Wed Nov 4 22:50:23 2009
@@ -35,7 +35,7 @@
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.manager.ConnManager;
import org.apache.hadoop.sqoop.util.Executor;
-import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
/**
* Utility to import a table into the Hive metastore. Manages the connection
@@ -158,8 +158,9 @@
args.add("-f");
args.add(tmpFilename);
- LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
- int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
+ LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+ int ret = Executor.exec(args.toArray(new String[0]),
+ env.toArray(new String[0]), logSink, logSink);
if (0 != ret) {
throw new IOException("Hive exited with status " + ret);
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/io/SplittableBufferedWriter.java Wed Nov 4 22:50:23 2009
@@ -19,8 +19,10 @@
package org.apache.hadoop.sqoop.io;
import java.io.BufferedWriter;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.IOException;
+import java.util.Formatter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Wed Nov 4 22:50:23 2009
@@ -33,27 +33,27 @@
* The implementations of this class drive the actual discussion with
* the database about table formats, etc.
*/
-public interface ConnManager {
+public abstract class ConnManager {
/**
* Return a list of all databases on a server
*/
- String [] listDatabases();
+ public abstract String [] listDatabases();
/**
* Return a list of all tables in a database
*/
- String [] listTables();
+ public abstract String [] listTables();
/**
* Return a list of column names in a table in the order returned by the db.
*/
- String [] getColumnNames(String tableName);
+ public abstract String [] getColumnNames(String tableName);
/**
* Return the name of the primary key for a table, or null if there is none.
*/
- String getPrimaryKey(String tableName);
+ public abstract String getPrimaryKey(String tableName);
/**
* Return an unordered mapping from colname to sqltype for
@@ -61,7 +61,7 @@
*
* The Integer type id is a constant from java.sql.Types
*/
- Map<String, Integer> getColumnTypes(String tableName);
+ public abstract Map<String, Integer> getColumnTypes(String tableName);
/**
* Execute a SQL statement to read the named set of columns from a table.
@@ -70,32 +70,32 @@
* The client is responsible for calling ResultSet.close() when done with the
* returned ResultSet object.
*/
- ResultSet readTable(String tableName, String [] columns) throws SQLException;
+ public abstract ResultSet readTable(String tableName, String [] columns) throws SQLException;
/**
* @return the actual database connection
*/
- Connection getConnection() throws SQLException;
+ public abstract Connection getConnection() throws SQLException;
/**
* @return a string identifying the driver class to load for this JDBC connection type.
*/
- String getDriverClass();
+ public abstract String getDriverClass();
/**
* Execute a SQL statement 's' and print its results to stdout
*/
- void execAndPrint(String s);
+ public abstract void execAndPrint(String s);
/**
* Perform an import of a table from the database into HDFS
*/
- void importTable(String tableName, String jarFile, Configuration conf)
+ public abstract void importTable(ImportJobContext context)
throws IOException, ImportError;
/**
* Perform any shutdown operations on the connection.
*/
- void close() throws SQLException;
+ public abstract void close() throws SQLException;
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java Wed Nov 4 22:50:23 2009
@@ -27,7 +27,7 @@
* Contains instantiation code for all ConnManager implementations
* shipped and enabled by default in Sqoop.
*/
-public final class DefaultManagerFactory implements ManagerFactory {
+public final class DefaultManagerFactory extends ManagerFactory {
public static final Log LOG = LogFactory.getLog(DefaultManagerFactory.class.getName());
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Wed Nov 4 22:50:23 2009
@@ -28,8 +28,6 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -41,11 +39,13 @@
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.Executor;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
+import org.apache.hadoop.sqoop.util.AsyncSink;
/**
* Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@@ -64,35 +64,24 @@
/** Copies data directly into HDFS, adding the user's chosen line terminator
char to each record.
*/
- static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
+ static class PostgresqlAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final PerfCounters counters;
private final ImportOptions options;
- PostgresqlStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
+ PostgresqlAsyncSink(final SplittableBufferedWriter w, final ImportOptions opts,
final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
- private PostgresqlStreamThread child;
-
public void processStream(InputStream is) {
child = new PostgresqlStreamThread(is, writer, options, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private static class PostgresqlStreamThread extends Thread {
+ private static class PostgresqlStreamThread extends ErrorableThread {
public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
private final SplittableBufferedWriter writer;
@@ -100,8 +89,6 @@
private final ImportOptions options;
private final PerfCounters counters;
- private boolean error;
-
PostgresqlStreamThread(final InputStream is, final SplittableBufferedWriter w,
final ImportOptions opts, final PerfCounters ctrs) {
this.stream = is;
@@ -110,10 +97,6 @@
this.counters = ctrs;
}
- public boolean isErrored() {
- return error;
- }
-
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
@@ -138,7 +121,7 @@
} catch (IOException ioe) {
LOG.error("IOException reading from psql: " + ioe.toString());
// set the error bit so our caller can see that something went wrong.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -312,9 +295,13 @@
* Import the table into HDFS by using psql to pull the data out of the db
* via COPY FILE TO STDOUT.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
+
LOG.info("Beginning psql fast path import");
if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -327,7 +314,7 @@
String commandFilename = null;
String passwordFilename = null;
Process p = null;
- StreamHandlerFactory streamHandler = null;
+ AsyncSink sink = null;
PerfCounters counters = new PerfCounters();
try {
@@ -395,19 +382,21 @@
LOG.debug(" " + arg);
}
- // This writer will be closed by StreamHandlerFactory.
- SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+ // This writer will be closed by AsyncSink.
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+ options.getConf(), options, tableName);
// Actually start the psql dump.
- p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));
+ p = Runtime.getRuntime().exec(args.toArray(new String[0]),
+ envp.toArray(new String[0]));
// read from the stdout pipe into the HDFS writer.
InputStream is = p.getInputStream();
- streamHandler = new PostgresqlStreamHandlerFactory(w, options, counters);
+ sink = new PostgresqlAsyncSink(w, options, counters);
- LOG.debug("Starting stream handler");
+ LOG.debug("Starting stream sink");
counters.startClock();
- streamHandler.processStream(is);
+ sink.processStream(is);
} finally {
// block until the process is done.
LOG.debug("Waiting for process completion");
@@ -440,12 +429,12 @@
}
}
- // block until the stream handler is done too.
+ // block until the stream sink is done too.
int streamResult = 0;
- if (null != streamHandler) {
+ if (null != sink) {
while (true) {
try {
- streamResult = streamHandler.join();
+ streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
@@ -463,7 +452,7 @@
}
if (0 != streamResult) {
- throw new IOException("Encountered exception in stream handler");
+ throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Wed Nov 4 22:50:23 2009
@@ -30,9 +30,6 @@
* Database manager that is connects to a generic JDBC-compliant
* database; its constructor is parameterized on the JDBC Driver
* class to load.
- *
- *
- *
*/
public class GenericJdbcManager extends SqlManager {
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/HsqldbManager.java Wed Nov 4 22:50:23 2009
@@ -27,7 +27,7 @@
* Manages connections to hsqldb databases.
* Extends generic SQL manager.
*/
-public class HsqldbManager extends GenericJdbcManager implements ConnManager {
+public class HsqldbManager extends GenericJdbcManager {
public static final Log LOG = LogFactory.getLog(HsqldbManager.class.getName());
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Wed Nov 4 22:50:23 2009
@@ -27,28 +27,24 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.nio.CharBuffer;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.sqoop.ImportOptions;
import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
import org.apache.hadoop.sqoop.lib.FieldFormatter;
import org.apache.hadoop.sqoop.lib.RecordParser;
import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
+import org.apache.hadoop.sqoop.util.ErrorableThread;
import org.apache.hadoop.sqoop.util.ImportError;
import org.apache.hadoop.sqoop.util.JdbcUrl;
import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
-import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.sqoop.util.AsyncSink;
/**
* Manages direct connections to MySQL databases
@@ -58,57 +54,42 @@
public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
- // StreamHandlers used to import data from mysqldump directly into HDFS.
+ // AsyncSinks used to import data from mysqldump directly into HDFS.
/**
* Copies data directly from mysqldump into HDFS, after stripping some
* header and footer characters that are attached to each line in mysqldump.
*/
- static class CopyingStreamHandlerFactory implements StreamHandlerFactory {
+ static class CopyingAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final PerfCounters counters;
- CopyingStreamHandlerFactory(final SplittableBufferedWriter w, final PerfCounters ctrs) {
+ CopyingAsyncSink(final SplittableBufferedWriter w,
+ final PerfCounters ctrs) {
this.writer = w;
this.counters = ctrs;
}
- private CopyingStreamThread child;
-
public void processStream(InputStream is) {
child = new CopyingStreamThread(is, writer, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private static class CopyingStreamThread extends Thread {
- public static final Log LOG = LogFactory.getLog(CopyingStreamThread.class.getName());
+ private static class CopyingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ CopyingStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final InputStream stream;
private final PerfCounters counters;
- private boolean error;
-
- CopyingStreamThread(final InputStream is, final SplittableBufferedWriter w,
- final PerfCounters ctrs) {
+ CopyingStreamThread(final InputStream is,
+ final SplittableBufferedWriter w, final PerfCounters ctrs) {
this.writer = w;
this.stream = is;
this.counters = ctrs;
}
- public boolean isErrored() {
- return error;
- }
-
public void run() {
BufferedReader r = null;
SplittableBufferedWriter w = this.writer;
@@ -143,7 +124,7 @@
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so we get an error status back in the caller.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -167,49 +148,38 @@
/**
- * The ReparsingStreamHandler will instantiate a RecordParser to read mysqldump's
+ * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
* output, and re-emit the text in the user's specified output format.
*/
- static class ReparsingStreamHandlerFactory implements StreamHandlerFactory {
+ static class ReparsingAsyncSink extends ErrorableAsyncSink {
private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final PerfCounters counters;
- ReparsingStreamHandlerFactory(final SplittableBufferedWriter w, final ImportOptions opts,
- final PerfCounters ctrs) {
+ ReparsingAsyncSink(final SplittableBufferedWriter w,
+ final ImportOptions opts, final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.counters = ctrs;
}
- private ReparsingStreamThread child;
-
public void processStream(InputStream is) {
child = new ReparsingStreamThread(is, writer, options, counters);
child.start();
}
- public int join() throws InterruptedException {
- child.join();
- if (child.isErrored()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private static class ReparsingStreamThread extends Thread {
- public static final Log LOG = LogFactory.getLog(ReparsingStreamThread.class.getName());
+ private static class ReparsingStreamThread extends ErrorableThread {
+ public static final Log LOG = LogFactory.getLog(
+ ReparsingStreamThread.class.getName());
private final SplittableBufferedWriter writer;
private final ImportOptions options;
private final InputStream stream;
private final PerfCounters counters;
- private boolean error;
-
- ReparsingStreamThread(final InputStream is, final SplittableBufferedWriter w,
- final ImportOptions opts, final PerfCounters ctrs) {
+ ReparsingStreamThread(final InputStream is,
+ final SplittableBufferedWriter w, final ImportOptions opts,
+ final PerfCounters ctrs) {
this.writer = w;
this.options = opts;
this.stream = is;
@@ -226,12 +196,9 @@
static {
// build a record parser for mysqldump's format
- MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM, MYSQL_RECORD_DELIM,
- MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR, MYSQL_ENCLOSE_REQUIRED);
- }
-
- public boolean isErrored() {
- return error;
+ MYSQLDUMP_PARSER = new RecordParser(MYSQL_FIELD_DELIM,
+ MYSQL_RECORD_DELIM, MYSQL_ENCLOSE_CHAR, MYSQL_ESCAPE_CHAR,
+ MYSQL_ENCLOSE_REQUIRED);
}
public void run() {
@@ -300,7 +267,7 @@
} catch (IOException ioe) {
LOG.error("IOException reading from mysqldump: " + ioe.toString());
// flag this error so the parent can handle it appropriately.
- error = true;
+ setError();
} finally {
if (null != r) {
try {
@@ -374,9 +341,13 @@
* Import the table into HDFS by using mysqldump to pull out the data from
* the database and upload the files directly to HDFS.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
+
LOG.info("Beginning mysqldump fast path import");
if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
@@ -406,7 +377,7 @@
String passwordFile = null;
Process p = null;
- StreamHandlerFactory streamHandler = null;
+ AsyncSink sink = null;
PerfCounters counters = new PerfCounters();
try {
// --defaults-file must be the first argument.
@@ -446,8 +417,9 @@
LOG.debug(" " + arg);
}
- // This writer will be closed by StreamHandlerFactory.
- SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(conf, options, tableName);
+ // This writer will be closed by AsyncSink.
+ SplittableBufferedWriter w = DirectImportUtils.createHdfsSink(
+ options.getConf(), options, tableName);
// Actually start the mysqldump.
p = Runtime.getRuntime().exec(args.toArray(new String[0]));
@@ -457,19 +429,19 @@
if (outputDelimsAreMySQL()) {
LOG.debug("Output delimiters conform to mysqldump; using straight copy");
- streamHandler = new CopyingStreamHandlerFactory(w, counters);
+ sink = new CopyingAsyncSink(w, counters);
} else {
LOG.debug("User-specified delimiters; using reparsing import");
LOG.info("Converting data to use specified delimiters.");
LOG.info("(For the fastest possible import, use");
LOG.info("--mysql-delimiters to specify the same field");
LOG.info("delimiters as are used by mysqldump.)");
- streamHandler = new ReparsingStreamHandlerFactory(w, options, counters);
+ sink = new ReparsingAsyncSink(w, options, counters);
}
// Start an async thread to read and upload the whole stream.
counters.startClock();
- streamHandler.processStream(is);
+ sink.processStream(is);
} finally {
// block until the process is done.
@@ -495,12 +467,12 @@
}
}
- // block until the stream handler is done too.
+ // block until the stream sink is done too.
int streamResult = 0;
- if (null != streamHandler) {
+ if (null != sink) {
while (true) {
try {
- streamResult = streamHandler.join();
+ streamResult = sink.join();
} catch (InterruptedException ie) {
// interrupted; loop around.
continue;
@@ -518,7 +490,7 @@
}
if (0 != streamResult) {
- throw new IOException("Encountered exception in stream handler");
+ throw new IOException("Encountered exception in stream sink");
}
counters.stopClock();
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ManagerFactory.java Wed Nov 4 22:50:23 2009
@@ -27,7 +27,7 @@
* calls the accept() method of each ManagerFactory, in order until
* one such call returns a non-null ConnManager instance.
*/
-public interface ManagerFactory {
- ConnManager accept(ImportOptions options);
+public abstract class ManagerFactory {
+ public abstract ConnManager accept(ImportOptions options);
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java Wed Nov 4 22:50:23 2009
@@ -92,13 +92,13 @@
}
@Override
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
// Check that we're not doing a MapReduce from localhost. If we are, point
// out that we could use mysqldump.
if (!MySQLManager.warningPrinted) {
- String connectString = options.getConnectString();
+ String connectString = context.getOptions().getConnectString();
if (null != connectString && connectString.indexOf("//localhost") != -1) {
// if we're not doing a remote connection, they should have a LocalMySQLManager.
@@ -114,7 +114,7 @@
}
// Then run the normal importTable() method.
- super.importTable(tableName, jarFile, conf);
+ super.importTable(context);
}
/**
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java Wed Nov 4 22:50:23 2009
@@ -90,8 +90,12 @@
* This importTable() implementation continues to use the older DBInputFormat
* because DataDrivenDBInputFormat does not currently work with Oracle.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
ImportJob importer = new ImportJob(options);
String splitCol = options.getSplitByCol();
if (null == splitCol) {
@@ -105,7 +109,7 @@
+ ". Please specify one with --split-by.");
}
- importer.runImport(tableName, jarFile, splitCol, conf);
+ importer.runImport(tableName, jarFile, splitCol, options.getConf());
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java Wed Nov 4 22:50:23 2009
@@ -71,13 +71,13 @@
}
@Override
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
// The user probably should have requested --direct to invoke pg_dump.
// Display a warning informing them of this fact.
if (!PostgresqlManager.warningPrinted) {
- String connectString = options.getConnectString();
+ String connectString = context.getOptions().getConnectString();
LOG.warn("It looks like you are importing from postgresql.");
LOG.warn("This transfer can be faster! Use the --direct");
@@ -87,7 +87,7 @@
}
// Then run the normal importTable() method.
- super.importTable(tableName, jarFile, conf);
+ super.importTable(context);
}
@Override
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Wed Nov 4 22:50:23 2009
@@ -45,7 +45,7 @@
* This is an abstract class; it requires a database-specific
* ConnManager implementation to actually create the connection.
*/
-public abstract class SqlManager implements ConnManager {
+public abstract class SqlManager extends ConnManager {
public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
@@ -260,8 +260,11 @@
* Default implementation of importTable() is to launch a MapReduce job
* via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
*/
- public void importTable(String tableName, String jarFile, Configuration conf)
+ public void importTable(ImportJobContext context)
throws IOException, ImportError {
+ String tableName = context.getTableName();
+ String jarFile = context.getJarFile();
+ ImportOptions options = context.getOptions();
DataDrivenImportJob importer = new DataDrivenImportJob(options);
String splitCol = options.getSplitByCol();
if (null == splitCol) {
@@ -275,7 +278,7 @@
+ ". Please specify one with --split-by.");
}
- importer.runImport(tableName, jarFile, splitCol, conf);
+ importer.runImport(tableName, jarFile, splitCol, options.getConf());
}
/**
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Wed Nov 4 22:50:23 2009
@@ -37,7 +37,8 @@
*/
public final class DirectImportUtils {
- public static final Log LOG = LogFactory.getLog(DirectImportUtils.class.getName());
+ public static final Log LOG = LogFactory.getLog(
+ DirectImportUtils.class.getName());
private DirectImportUtils() {
}
@@ -47,7 +48,8 @@
* which may be e.g. "a+x" or "0600", etc.
* @throws IOException if chmod failed.
*/
- public static void setFilePermissions(File file, String modstr) throws IOException {
+ public static void setFilePermissions(File file, String modstr)
+ throws IOException {
// Set this file to be 0600. Java doesn't have a built-in mechanism for this
// so we need to go out to the shell to execute chmod.
try {
@@ -61,9 +63,9 @@
/**
* Open a file in HDFS for write to hold the data associated with a table.
- * Creates any necessary directories, and returns the OutputStream to write to.
- * The caller is responsible for calling the close() method on the returned
- * stream.
+ * Creates any necessary directories, and returns the OutputStream to write
+ * to. The caller is responsible for calling the close() method on the
+ * returned stream.
*/
public static SplittableBufferedWriter createHdfsSink(Configuration conf,
ImportOptions options, String tableName) throws IOException {
@@ -83,8 +85,8 @@
// This Writer will be closed by the caller.
return new SplittableBufferedWriter(
- new SplittingOutputStream(conf, destDir, "data-", options.getDirectSplitSize(),
- options.shouldUseCompression()));
+ new SplittingOutputStream(conf, destDir, "data-",
+ options.getDirectSplitSize(), options.shouldUseCompression()));
}
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java Wed Nov 4 22:50:23 2009
@@ -39,49 +39,49 @@
}
/**
- * Execute a program defined by the args array with default stream handlers
+ * Execute a program defined by the args array with default stream sinks
* that consume the program's output (to prevent it from blocking on buffers)
* and then ignore said output.
*/
public static int exec(String [] args) throws IOException {
- NullStreamHandlerFactory f = new NullStreamHandlerFactory();
- return exec(args, f, f);
+ NullAsyncSink s = new NullAsyncSink();
+ return exec(args, s, s);
}
/**
* Run a command via Runtime.exec(), with its stdout and stderr streams
- * directed to be handled by threads generated by StreamHandlerFactories.
+ * directed to be handled by threads generated by AsyncSinks.
* Block until the child process terminates.
*
* @return the exit status of the ran program
*/
- public static int exec(String [] args, StreamHandlerFactory outHandler,
- StreamHandlerFactory errHandler) throws IOException {
- return exec(args, null, outHandler, errHandler);
+ public static int exec(String [] args, AsyncSink outSink,
+ AsyncSink errSink) throws IOException {
+ return exec(args, null, outSink, errSink);
}
/**
* Run a command via Runtime.exec(), with its stdout and stderr streams
- * directed to be handled by threads generated by StreamHandlerFactories.
+ * directed to be handled by threads generated by AsyncSinks.
* Block until the child process terminates. Allows the programmer to
* specify an environment for the child program.
*
* @return the exit status of the ran program
*/
- public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
- StreamHandlerFactory errHandler) throws IOException {
+ public static int exec(String [] args, String [] envp, AsyncSink outSink,
+ AsyncSink errSink) throws IOException {
// launch the process.
Process p = Runtime.getRuntime().exec(args, envp);
- // dispatch its stdout and stderr to stream handlers if available.
- if (null != outHandler) {
- outHandler.processStream(p.getInputStream());
+ // dispatch its stdout and stderr to stream sinks if available.
+ if (null != outSink) {
+ outSink.processStream(p.getInputStream());
}
- if (null != errHandler) {
- errHandler.processStream(p.getErrorStream());
+ if (null != errSink) {
+ errSink.processStream(p.getErrorStream());
}
// wait for the return value.
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java Wed Nov 4 22:50:23 2009
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.ImportJobContext;
import org.apache.hadoop.sqoop.manager.ManagerFactory;
import junit.framework.TestCase;
@@ -75,14 +76,14 @@
////// mock classes used for test cases above //////
- public static class AlwaysDummyFactory implements ManagerFactory {
+ public static class AlwaysDummyFactory extends ManagerFactory {
public ConnManager accept(ImportOptions opts) {
// Always return a new DummyManager
return new DummyManager();
}
}
- public static class EmptyFactory implements ManagerFactory {
+ public static class EmptyFactory extends ManagerFactory {
public ConnManager accept(ImportOptions opts) {
// Never instantiate a proper ConnManager;
return null;
@@ -92,7 +93,7 @@
/**
* This implementation doesn't do anything special.
*/
- public static class DummyManager implements ConnManager {
+ public static class DummyManager extends ConnManager {
public void close() {
}
@@ -131,7 +132,7 @@
public void execAndPrint(String s) {
}
- public void importTable(String tableName, String jarFile, Configuration conf) {
+ public void importTable(ImportJobContext context) {
}
}
}
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
/hadoop/core/trunk/src/contrib/streaming:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/streaming:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/streaming:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/contrib/vaidya/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112
/hadoop/core/trunk/src/contrib/vaidya:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/vaidya:817878-830225
+/hadoop/mapreduce/trunk/src/contrib/vaidya:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/docs/src/documentation/content/xdocs/distcp.xml Wed Nov 4 22:50:23 2009
@@ -317,6 +317,36 @@
</section>
<section>
+ <title>Copying to S3</title>
+
+ <p>DistCp can be used to copy data between HDFS and other filesystems,
+ including those backed by S3. The <code>s3n</code> FileSystem
+ implementation allows DistCp (and Hadoop in general) to use an S3
+ bucket as a source or target for transfers. To transfer data from
+ HDFS to an S3 bucket, invoke DistCp using arguments like the following:
+ </p>
+<source>
+bash$ hadoop distcp hdfs://nn:8020/foo/bar \
+ s3n://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@<bucket>/foo/bar
+</source>
+
+ <p><code>$AWS_ACCESS_KEY_ID</code> and
+ <code>$AWS_SECRET_ACCESS_KEY</code> are environment variables holding
+ S3 access credentials.</p>
+
+ <p>Some FileSystem operations take longer on S3 than on HDFS. If you
+ are transferring large files to S3 (e.g., 1 GB and up), you may
+ experience timeouts during your job. To prevent this, you should set
+ the task timeout to a larger interval than is typically used:
+ </p>
+<source>
+bash$ hadoop distcp -D mapred.task.timeout=1800000 \
+ hdfs://nn:8020/foo/bar \
+ s3n://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@<bucket>/foo/bar
+</source>
+ </section>
+
+ <section>
<title>MapReduce and Other Side-effects</title>
<p>As has been mentioned in the preceding, should a map fail to copy
Propchange: hadoop/mapreduce/branches/HDFS-641/src/examples/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/examples:713112
/hadoop/core/trunk/src/examples:776175-784663
-/hadoop/mapreduce/trunk/src/examples:817878-830225
+/hadoop/mapreduce/trunk/src/examples:817878-832891
Propchange: hadoop/mapreduce/branches/HDFS-641/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/java:713112
/hadoop/core/trunk/src/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/java:817878-830225
+/hadoop/mapreduce/trunk/src/java:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Nov 4 22:50:23 2009
@@ -1541,7 +1541,6 @@
name = TaskType.JOB_CLEANUP;
} else if (tip.isMapTask()) {
++runningMapTasks;
- metrics.addRunningMaps(jobId, 1);
name = TaskType.MAP;
counter = JobCounter.TOTAL_LAUNCHED_MAPS;
splits = tip.getSplitNodes();
@@ -1553,7 +1552,6 @@
metrics.launchMap(id);
} else {
++runningReduceTasks;
- metrics.addRunningReduces(jobId, 1);
name = TaskType.REDUCE;
counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
if (tip.isSpeculating()) {
@@ -2059,7 +2057,9 @@
String taskTrackerName = tts.getTrackerName();
String taskTrackerHost = tts.getHost();
if (numMapTasks == 0) {
- LOG.info("No maps to schedule for " + profile.getJobID());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("No maps to schedule for " + profile.getJobID());
+ }
return -1;
}
@@ -2249,7 +2249,9 @@
String taskTrackerName = tts.getTrackerName();
String taskTrackerHost = tts.getHost();
if (numReduceTasks == 0) {
- LOG.info("No reduces to schedule for " + profile.getJobID());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("No reduces to schedule for " + profile.getJobID());
+ }
return -1;
}
TaskInProgress tip = null;
@@ -2604,7 +2606,6 @@
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
- metrics.decRunningMaps(jobId, 1);
finishedMapTasks += 1;
metrics.completeMap(taskid);
if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
@@ -2617,7 +2618,6 @@
}
} else {
runningReduceTasks -= 1;
- metrics.decRunningReduces(jobId, 1);
finishedReduceTasks += 1;
metrics.completeReduce(taskid);
if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
@@ -2966,7 +2966,6 @@
launchedSetup = false;
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
- metrics.decRunningMaps(jobId, 1);
metrics.failedMap(taskid);
// remove from the running queue and put it in the non-running cache
// if the tip is not complete i.e if the tip still needs to be run
@@ -2976,7 +2975,6 @@
}
} else {
runningReduceTasks -= 1;
- metrics.decRunningReduces(jobId, 1);
metrics.failedReduce(taskid);
// remove from the running queue and put in the failed queue if the tip
// is not complete
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Nov 4 22:50:23 2009
@@ -420,15 +420,7 @@
if ((now - newProfile.getLastSeen()) >
tasktrackerExpiryInterval) {
// Remove completely after marking the tasks as 'KILLED'
- lostTaskTracker(current);
- // tracker is lost, and if it is blacklisted, remove
- // it from the count of blacklisted trackers in the cluster
- if (isBlacklisted(trackerName)) {
- faultyTrackers.decrBlackListedTrackers(1);
- }
- updateTaskTrackerStatus(trackerName, null);
- statistics.taskTrackerRemoved(trackerName);
- getInstrumentation().decTrackers(1);
+ removeTracker(current);
// remove the mapping from the hosts list
String hostname = newProfile.getHost();
hostnameToTaskTracker.get(hostname).remove(trackerName);
@@ -443,6 +435,19 @@
}
}
+ private void removeTracker(TaskTracker tracker) {
+ lostTaskTracker(tracker);
+ String trackerName = tracker.getStatus().getTrackerName();
+ // tracker is lost, and if it is blacklisted, remove
+ // it from the count of blacklisted trackers in the cluster
+ if (isBlacklisted(trackerName)) {
+ faultyTrackers.decrBlackListedTrackers(1);
+ }
+ updateTaskTrackerStatus(trackerName, null);
+ statistics.taskTrackerRemoved(trackerName);
+ getInstrumentation().decTrackers(1);
+ }
+
public synchronized void retireJob(JobID jobid, String historyFile) {
synchronized (jobs) {
JobInProgress job = jobs.get(jobid);
@@ -2439,6 +2444,8 @@
totalReduces -= oldStatus.countReduceTasks();
occupiedMapSlots -= oldStatus.countOccupiedMapSlots();
occupiedReduceSlots -= oldStatus.countOccupiedReduceSlots();
+ getInstrumentation().decRunningMaps(oldStatus.countMapTasks());
+ getInstrumentation().decRunningReduces(oldStatus.countReduceTasks());
getInstrumentation().decOccupiedMapSlots(oldStatus.countOccupiedMapSlots());
getInstrumentation().decOccupiedReduceSlots(oldStatus.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
@@ -2465,6 +2472,8 @@
totalReduces += status.countReduceTasks();
occupiedMapSlots += status.countOccupiedMapSlots();
occupiedReduceSlots += status.countOccupiedReduceSlots();
+ getInstrumentation().addRunningMaps(status.countMapTasks());
+ getInstrumentation().addRunningReduces(status.countReduceTasks());
getInstrumentation().addOccupiedMapSlots(status.countOccupiedMapSlots());
getInstrumentation().addOccupiedReduceSlots(status.countOccupiedReduceSlots());
if (!faultyTrackers.isBlacklisted(status.getHost())) {
@@ -3824,9 +3833,7 @@
for (TaskTracker tracker : trackers) {
LOG.info("Decommission: Losing tracker " + tracker +
" on host " + host);
- lostTaskTracker(tracker); // lose the tracker
- updateTaskTrackerStatus(
- tracker.getStatus().getTrackerName(), null);
+ removeTracker(tracker);
}
trackersDecommissioned += trackers.size();
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerInstrumentation.java Wed Nov 4 22:50:23 2009
@@ -127,16 +127,16 @@
public void decRunningJob(JobConf conf, JobID id)
{ }
- public void addRunningMaps(JobID id, int task)
+ public void addRunningMaps(int tasks)
{ }
- public void decRunningMaps(JobID id, int task)
+ public void decRunningMaps(int tasks)
{ }
- public void addRunningReduces(JobID id, int task)
+ public void addRunningReduces(int tasks)
{ }
- public void decRunningReduces(JobID id, int task)
+ public void decRunningReduces(int tasks)
{ }
public void killedMap(TaskAttemptID taskAttemptID)
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Wed Nov 4 22:50:23 2009
@@ -341,25 +341,25 @@
}
@Override
- public synchronized void addRunningMaps(JobID id, int task)
+ public synchronized void addRunningMaps(int task)
{
numRunningMaps += task;
}
@Override
- public synchronized void decRunningMaps(JobID id, int task)
+ public synchronized void decRunningMaps(int task)
{
numRunningMaps -= task;
}
@Override
- public synchronized void addRunningReduces(JobID id, int task)
+ public synchronized void addRunningReduces(int task)
{
numRunningReduces += task;
}
@Override
- public synchronized void decRunningReduces(JobID id, int task)
+ public synchronized void decRunningReduces(int task)
{
numRunningReduces -= task;
}
Modified: hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/java/org/apache/hadoop/mapred/ResourceEstimator.java Wed Nov 4 22:50:23 2009
@@ -56,9 +56,11 @@
completedMapsInputSize+=(tip.getMapInputSize()+1);
completedMapsOutputSize+=ts.getOutputSize();
- LOG.info("completedMapsUpdates:"+completedMapsUpdates+" "+
- "completedMapsInputSize:"+completedMapsInputSize+" " +
- "completedMapsOutputSize:"+completedMapsOutputSize);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("completedMapsUpdates:"+completedMapsUpdates+" "+
+ "completedMapsInputSize:"+completedMapsInputSize+" " +
+ "completedMapsOutputSize:"+completedMapsOutputSize);
+ }
}
}
@@ -73,7 +75,9 @@
//add desiredMaps() so that randomwriter case doesn't blow up
long estimate = Math.round((inputSize *
completedMapsOutputSize * 2.0)/completedMapsInputSize);
- LOG.debug("estimate total map output will be " + estimate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("estimate total map output will be " + estimate);
+ }
return estimate;
}
}
Propchange: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
/hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:817878-830225
+/hadoop/mapreduce/trunk/src/test/mapred:817878-832891
Modified: hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java?rev=832892&r1=832891&r2=832892&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerInstrumentation.java Wed Nov 4 22:50:23 2009
@@ -129,11 +129,6 @@
taskAttemptID[1] = job.findMapTask(trackers[1]);
taskAttemptID[2] = job.findReduceTask(trackers[2]);
- assertTrue("Mismatch in num running maps",
- mi.numRunningMaps == numMaps);
- assertTrue("Mismatch in num running reduces",
- mi.numRunningReduces == numReds);
-
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
@@ -254,6 +249,10 @@
mapSlotsPerTask+mapSlotsPerTask1, mi.numOccupiedMapSlots);
assertEquals("Mismatch in reduce slots occupied",
reduceSlotsPerTask+reduceSlotsPerTask1, mi.numOccupiedReduceSlots);
+ assertEquals("Mismatch in num running maps",
+ 2, mi.numRunningMaps);
+ assertEquals("Mismatch in num running reduces",
+ 2, mi.numRunningReduces);
//now send heartbeat with no running tasks
status = new TaskTrackerStatus[1];
@@ -265,6 +264,10 @@
0, mi.numOccupiedMapSlots);
assertEquals("Mismatch in reduce slots occupied",
0, mi.numOccupiedReduceSlots);
+ assertEquals("Mismatch in num running maps",
+ 0, mi.numRunningMaps);
+ assertEquals("Mismatch in num running reduces",
+ 0, mi.numRunningReduces);
}
public void testReservedSlots() throws IOException {
@@ -295,6 +298,18 @@
}
public void testDecomissionedTrackers() throws IOException {
+ // create TaskTrackerStatus and send heartbeats
+ TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
+ status[0] = getTTStatus(trackers[0], new ArrayList<TaskStatus>());
+ status[1] = getTTStatus(trackers[1], new ArrayList<TaskStatus>());
+ status[2] = getTTStatus(trackers[2], new ArrayList<TaskStatus>());
+ for (int i = 0; i< trackers.length; i++) {
+ FakeObjectUtilities.sendHeartBeat(jobTracker, status[i], false,
+ false, trackers[i], responseId);
+ }
+
+ assertEquals("Mismatch in number of trackers",
+ trackers.length, mi.numTrackers);
Set<String> dHosts = new HashSet<String>();
dHosts.add(hosts[1]);
assertEquals("Mismatch in number of decommissioned trackers",
@@ -302,6 +317,8 @@
jobTracker.decommissionNodes(dHosts);
assertEquals("Mismatch in number of decommissioned trackers",
1, mi.numTrackersDecommissioned);
+ assertEquals("Mismatch in number of trackers",
+ trackers.length - 1, mi.numTrackers);
}
static class FakeTaskScheduler extends JobQueueTaskScheduler {
@@ -555,25 +572,25 @@
}
@Override
- public synchronized void addRunningMaps(JobID id, int task)
+ public synchronized void addRunningMaps(int task)
{
numRunningMaps += task;
}
@Override
- public synchronized void decRunningMaps(JobID id, int task)
+ public synchronized void decRunningMaps(int task)
{
numRunningMaps -= task;
}
@Override
- public synchronized void addRunningReduces(JobID id, int task)
+ public synchronized void addRunningReduces(int task)
{
numRunningReduces += task;
}
@Override
- public synchronized void decRunningReduces(JobID id, int task)
+ public synchronized void decRunningReduces(int task)
{
numRunningReduces -= task;
}
Propchange: hadoop/mapreduce/branches/HDFS-641/src/webapps/job/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 4 22:50:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112
/hadoop/core/trunk/src/webapps/job:776175-785643
-/hadoop/mapreduce/trunk/src/webapps/job:817878-830225
+/hadoop/mapreduce/trunk/src/webapps/job:817878-832891
|