spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-8057][Core]Call TaskAttemptContext.getTaskAttemptID using Reflection
Date Fri, 07 Aug 2015 04:42:58 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5491dfb9a -> e902c4f26


[SPARK-8057][Core]Call TaskAttemptContext.getTaskAttemptID using Reflection

Someone may use the Spark core jar in the maven repo with hadoop 1. SPARK-2075 has already
resolved the compatibility issue to support it. But `SparkHadoopMapRedUtil.commitTask` broke
it recently.

This PR uses Reflection to call `TaskAttemptContext.getTaskAttemptID` to fix the compatibility
issue.

Author: zsxwing <zsxwing@gmail.com>

Closes #6599 from zsxwing/SPARK-8057 and squashes the following commits:

f7a343c [zsxwing] Remove the redundant import
6b7f1af [zsxwing] Call TaskAttemptContext.getTaskAttemptID using Reflection

(cherry picked from commit 672f467668da1cf20895ee57652489c306120288)
Signed-off-by: Reynold Xin <rxin@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e902c4f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e902c4f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e902c4f2

Branch: refs/heads/branch-1.5
Commit: e902c4f26fdaa505ea2c57c701b72b1f5d2d8b70
Parents: 5491dfb
Author: zsxwing <zsxwing@gmail.com>
Authored: Thu Aug 6 21:42:42 2015 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Aug 6 21:42:55 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkHadoopUtil.scala     | 14 ++++++++++++++
 .../apache/spark/mapred/SparkHadoopMapRedUtil.scala   |  3 ++-
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e902c4f2/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e06b06e..7e9dba4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
+import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.annotation.DeveloperApi
@@ -195,6 +197,18 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly
+   * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes
+   * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+
+   * while it's interface in Hadoop 2.+.
+   */
+  def getTaskAttemptIDFromTaskAttemptContext(
+      context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
+    val method = context.getClass.getMethod("getTaskAttemptID")
+    method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
+  }
+
+  /**
    * Get [[FileStatus]] objects for all leaf children (files) under the given base path.
If the
    * given path points to a file, return a single-element collection containing [[FileStatus]]
of
    * that file.

http://git-wip-us.apache.org/repos/asf/spark/blob/e902c4f2/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 87df427..f405b73 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
 import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
 
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.{Logging, SparkEnv, TaskContext}
 import org.apache.spark.util.{Utils => SparkUtils}
@@ -93,7 +94,7 @@ object SparkHadoopMapRedUtil extends Logging {
       splitId: Int,
       attemptId: Int): Unit = {
 
-    val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
+    val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
 
     // Called after we have decided to commit
     def performCommit(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message