spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: Revert "[SPARK-24277][SQL] Code clean up in SQL module: HadoopMapReduceCommitProtocol"
Date Fri, 18 May 2018 19:52:03 GMT
Repository: spark
Updated Branches:
  refs/heads/master ed7ba7db8 -> 1c4553d67


Revert "[SPARK-24277][SQL] Code clean up in SQL module: HadoopMapReduceCommitProtocol"

This reverts commit 7b2dca5b12164b787ec4e8e7e9f92c60a3f9563e.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c4553d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c4553d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c4553d6

Branch: refs/heads/master
Commit: 1c4553d67de8089e8aa84bc736faa11f21615a6a
Parents: ed7ba7d
Author: gatorsmile <gatorsmile@gmail.com>
Authored: Fri May 18 12:51:09 2018 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Fri May 18 12:51:09 2018 -0700

----------------------------------------------------------------------
 .../internal/io/HadoopMapReduceCommitProtocol.scala  | 15 ++++++++++++---
 .../execution/datasources/orc/OrcColumnVector.java   |  6 +++++-
 .../parquet/VectorizedRleValuesReader.java           |  4 ++--
 .../scala/org/apache/spark/sql/api/r/SQLUtils.scala  |  2 +-
 .../apache/spark/sql/execution/command/views.scala   | 10 ++++++----
 .../sql/execution/datasources/FileFormatWriter.scala | 11 ++++++-----
 .../sql/execution/ui/SQLAppStatusListener.scala      |  2 +-
 7 files changed, 33 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c4553d6/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 163511b..3e60c50 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -145,9 +145,18 @@ class HadoopMapReduceCommitProtocol(
   }
 
   override def setupJob(jobContext: JobContext): Unit = {
-    // Create a dummy [[TaskAttemptContextImpl]] with configuration to get [[OutputCommitter]]
-    // instance on Spark driver. Note that the job/task/attampt id doesn't matter here.
-    val taskAttemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+    // Setup IDs
+    val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
+    val taskId = new TaskID(jobId, TaskType.MAP, 0)
+    val taskAttemptId = new TaskAttemptID(taskId, 0)
+
+    // Set up the configuration object
+    jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
+    jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
+    jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+    jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
+    jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)
+
     val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
     committer = setupCommitter(taskAttemptContext)
     committer.setupJob(jobContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c4553d6/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index fcf73e8..12f4d65 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -47,7 +47,11 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
   OrcColumnVector(DataType type, ColumnVector vector) {
     super(type);
 
-    isTimestamp = type instanceof TimestampType;
+    if (type instanceof TimestampType) {
+      isTimestamp = true;
+    } else {
+      isTimestamp = false;
+    }
 
     baseData = vector;
     if (vector instanceof LongColumnVector) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1c4553d6/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index de0d65a..fe3d31a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -571,7 +571,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
     int ch3 = in.read();
     int ch2 = in.read();
     int ch1 = in.read();
-    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
   }
 
   /**
@@ -592,7 +592,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
         int ch3 = in.read();
         int ch2 = in.read();
         int ch1 = in.read();
-        return (ch1 << 16) + (ch2 << 8) + (ch3);
+        return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
       }
       case 4: {
         return readIntLittleEndian();

http://git-wip-us.apache.org/repos/asf/spark/blob/1c4553d6/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 265a84b..af20764 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -188,7 +188,7 @@ private[sql] object SQLUtils extends Logging {
     dataType match {
       case 's' =>
         // Read StructType for DataFrame
-        val fields = SerDe.readList(dis, jvmObjectTracker = null)
+        val fields = SerDe.readList(dis, jvmObjectTracker = null).asInstanceOf[Array[Object]]
         Row.fromSeq(fields)
       case _ => null
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c4553d6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 6373584..5172f32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -410,10 +410,12 @@ object ViewHelper {
     }
 
     // Detect cyclic references from subqueries.
-    plan.expressions.foreach {
-      case s: SubqueryExpression =>
-        checkCyclicViewReference(s.plan, path, viewIdent)
-      case _ => // Do nothing.
+    plan.expressions.foreach { expr =>
+      expr match {
+        case s: SubqueryExpression =>
+          checkCyclicViewReference(s.plan, path, viewIdent)
+        case _ => // Do nothing.
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c4553d6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 681bb1d..401597f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -244,17 +244,18 @@ object FileFormatWriter extends Logging {
       iterator: Iterator[InternalRow]): WriteTaskResult = {
 
     val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
+    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
 
     // Set up the attempt context required to use in the output committer.
     val taskAttemptContext: TaskAttemptContext = {
-      val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
-      val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
       // Set up the configuration object
       val hadoopConf = description.serializableHadoopConf.value
       hadoopConf.set("mapreduce.job.id", jobId.toString)
       hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
       hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
       hadoopConf.setBoolean("mapreduce.task.ismap", true)
+      hadoopConf.setInt("mapreduce.task.partition", 0)
 
       new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
     }
@@ -377,7 +378,7 @@ object FileFormatWriter extends Logging {
         dataSchema = description.dataColumns.toStructType,
         context = taskAttemptContext)
 
-      statsTrackers.foreach(_.newFile(currentPath))
+      statsTrackers.map(_.newFile(currentPath))
     }
 
     override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
@@ -428,10 +429,10 @@ object FileFormatWriter extends Logging {
       committer: FileCommitProtocol) extends ExecuteWriteTask {
 
     /** Flag saying whether or not the data to be written out is partitioned. */
-    private val isPartitioned = desc.partitionColumns.nonEmpty
+    val isPartitioned = desc.partitionColumns.nonEmpty
 
     /** Flag saying whether or not the data to be written out is bucketed. */
-    private val isBucketed = desc.bucketIdExpression.isDefined
+    val isBucketed = desc.bucketIdExpression.isDefined
 
     assert(isPartitioned || isBucketed,
       s"""DynamicPartitionWriteTask should be used for writing out data that's either

http://git-wip-us.apache.org/repos/asf/spark/blob/1c4553d6/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 2c4d0bc..d254af4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -170,7 +170,7 @@ class SQLAppStatusListener(
       .filter { case (id, _) => metricIds.contains(id) }
       .groupBy(_._1)
       .map { case (id, values) =>
-        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2))
+        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
       }
 
     // Check the execution again for whether the aggregated metrics data has been calculated.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message