spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-19960][CORE] Move `SparkHadoopWriter` to `internal/io/`
Date Wed, 15 Mar 2017 21:58:22 GMT
Repository: spark
Updated Branches:
  refs/heads/master 02c274eab -> 97cc5e5a5


[SPARK-19960][CORE] Move `SparkHadoopWriter` to `internal/io/`

## What changes were proposed in this pull request?

This PR introduces the following changes:
1. Move `SparkHadoopWriter` to `core/internal/io/`, so that it's in the same directory with
`SparkHadoopMapReduceWriter`;
2. Move `SparkHadoopWriterUtils` to a separated file.

After this PR is merged, we may consolidate `SparkHadoopWriter` and `SparkHadoopMapReduceWriter`,
and make the new commit protocol support the old `mapred` package's committer;

## How was this patch tested?

Tested by existing test cases.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #17304 from jiangxb1987/writer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97cc5e5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97cc5e5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97cc5e5a

Branch: refs/heads/master
Commit: 97cc5e5a5555519d221d0ca78645dde9bb8ea40b
Parents: 02c274e
Author: jiangxingbo <jiangxb1987@gmail.com>
Authored: Wed Mar 15 14:58:19 2017 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Wed Mar 15 14:58:19 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    | 160 -------------------
 .../io/SparkHadoopMapReduceWriter.scala         |  59 -------
 .../spark/internal/io/SparkHadoopWriter.scala   | 159 ++++++++++++++++++
 .../internal/io/SparkHadoopWriterUtils.scala    |  93 +++++++++++
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   3 +-
 .../OutputCommitCoordinatorSuite.scala          |   1 +
 6 files changed, 255 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97cc5e5a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
deleted file mode 100644
index 46e22b2..0000000
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.IOException
-import java.text.NumberFormat
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale}
-
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.mapreduce.TaskType
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.rdd.HadoopRDD
-import org.apache.spark.util.SerializableJobConf
-
-/**
- * Internal helper class that saves an RDD using a Hadoop OutputFormat.
- *
- * Saves the RDD using a JobConf, which should contain an output key class, an output value
class,
- * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
- */
-private[spark]
-class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
-
-  private val now = new Date()
-  private val conf = new SerializableJobConf(jobConf)
-
-  private var jobID = 0
-  private var splitID = 0
-  private var attemptID = 0
-  private var jID: SerializableWritable[JobID] = null
-  private var taID: SerializableWritable[TaskAttemptID] = null
-
-  @transient private var writer: RecordWriter[AnyRef, AnyRef] = null
-  @transient private var format: OutputFormat[AnyRef, AnyRef] = null
-  @transient private var committer: OutputCommitter = null
-  @transient private var jobContext: JobContext = null
-  @transient private var taskContext: TaskAttemptContext = null
-
-  def preSetup() {
-    setIDs(0, 0, 0)
-    HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
-
-    val jCtxt = getJobContext()
-    getOutputCommitter().setupJob(jCtxt)
-  }
-
-
-  def setup(jobid: Int, splitid: Int, attemptid: Int) {
-    setIDs(jobid, splitid, attemptid)
-    HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(now),
-      jobid, splitID, attemptID, conf.value)
-  }
-
-  def open() {
-    val numfmt = NumberFormat.getInstance(Locale.US)
-    numfmt.setMinimumIntegerDigits(5)
-    numfmt.setGroupingUsed(false)
-
-    val outputName = "part-"  + numfmt.format(splitID)
-    val path = FileOutputFormat.getOutputPath(conf.value)
-    val fs: FileSystem = {
-      if (path != null) {
-        path.getFileSystem(conf.value)
-      } else {
-        FileSystem.get(conf.value)
-      }
-    }
-
-    getOutputCommitter().setupTask(getTaskContext())
-    writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
-  }
-
-  def write(key: AnyRef, value: AnyRef) {
-    if (writer != null) {
-      writer.write(key, value)
-    } else {
-      throw new IOException("Writer is null, open() has not been called")
-    }
-  }
-
-  def close() {
-    writer.close(Reporter.NULL)
-  }
-
-  def commit() {
-    SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
-  }
-
-  def commitJob() {
-    val cmtr = getOutputCommitter()
-    cmtr.commitJob(getJobContext())
-  }
-
-  // ********* Private Functions *********
-
-  private def getOutputFormat(): OutputFormat[AnyRef, AnyRef] = {
-    if (format == null) {
-      format = conf.value.getOutputFormat()
-        .asInstanceOf[OutputFormat[AnyRef, AnyRef]]
-    }
-    format
-  }
-
-  private def getOutputCommitter(): OutputCommitter = {
-    if (committer == null) {
-      committer = conf.value.getOutputCommitter
-    }
-    committer
-  }
-
-  private def getJobContext(): JobContext = {
-    if (jobContext == null) {
-      jobContext = new JobContextImpl(conf.value, jID.value)
-    }
-    jobContext
-  }
-
-  private def getTaskContext(): TaskAttemptContext = {
-    if (taskContext == null) {
-      taskContext = newTaskAttemptContext(conf.value, taID.value)
-    }
-    taskContext
-  }
-
-  protected def newTaskAttemptContext(
-      conf: JobConf,
-      attemptId: TaskAttemptID): TaskAttemptContext = {
-    new TaskAttemptContextImpl(conf, attemptId)
-  }
-
-  private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
-    jobID = jobid
-    splitID = splitid
-    attemptID = attemptid
-
-    jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobid))
-    taID = new SerializableWritable[TaskAttemptID](
-        new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/97cc5e5a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 659ad5d..376ff9b 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -179,62 +179,3 @@ object SparkHadoopMapReduceWriter extends Logging {
     }
   }
 }
-
-private[spark]
-object SparkHadoopWriterUtils {
-
-  private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
-
-  def createJobID(time: Date, id: Int): JobID = {
-    val jobtrackerID = createJobTrackerID(time)
-    new JobID(jobtrackerID, id)
-  }
-
-  def createJobTrackerID(time: Date): String = {
-    new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
-  }
-
-  def createPathFromString(path: String, conf: JobConf): Path = {
-    if (path == null) {
-      throw new IllegalArgumentException("Output path is null")
-    }
-    val outputPath = new Path(path)
-    val fs = outputPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException("Incorrectly formatted output path")
-    }
-    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-  }
-
-  // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
-  // setting can take effect:
-  def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = {
-    val validationDisabled = disableOutputSpecValidation.value
-    val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
-    enabledInConf && !validationDisabled
-  }
-
-  // TODO: these don't seem like the right abstractions.
-  // We should abstract the duplicate code in a less awkward way.
-
-  def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
-    val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
-    (context.taskMetrics().outputMetrics, bytesWrittenCallback)
-  }
-
-  def maybeUpdateOutputMetrics(
-      outputMetrics: OutputMetrics,
-      callback: () => Long,
-      recordsWritten: Long): Unit = {
-    if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
-      outputMetrics.setBytesWritten(callback())
-      outputMetrics.setRecordsWritten(recordsWritten)
-    }
-  }
-
-  /**
-   * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
-   * basis; see SPARK-4835 for more details.
-   */
-  val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/97cc5e5a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
new file mode 100644
index 0000000..acc9c38
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.io.IOException
+import java.text.{NumberFormat, SimpleDateFormat}
+import java.util.{Date, Locale}
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.mapreduce.TaskType
+
+import org.apache.spark.SerializableWritable
+import org.apache.spark.internal.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.rdd.HadoopRDD
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * Internal helper class that saves an RDD using a Hadoop OutputFormat.
+ *
+ * Saves the RDD using a JobConf, which should contain an output key class, an output value
class,
+ * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
+ */
+private[spark]
+class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
+
+  private val now = new Date()
+  private val conf = new SerializableJobConf(jobConf)
+
+  private var jobID = 0
+  private var splitID = 0
+  private var attemptID = 0
+  private var jID: SerializableWritable[JobID] = null
+  private var taID: SerializableWritable[TaskAttemptID] = null
+
+  @transient private var writer: RecordWriter[AnyRef, AnyRef] = null
+  @transient private var format: OutputFormat[AnyRef, AnyRef] = null
+  @transient private var committer: OutputCommitter = null
+  @transient private var jobContext: JobContext = null
+  @transient private var taskContext: TaskAttemptContext = null
+
+  def preSetup() {
+    setIDs(0, 0, 0)
+    HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
+
+    val jCtxt = getJobContext()
+    getOutputCommitter().setupJob(jCtxt)
+  }
+
+
+  def setup(jobid: Int, splitid: Int, attemptid: Int) {
+    setIDs(jobid, splitid, attemptid)
+    HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(now),
+      jobid, splitID, attemptID, conf.value)
+  }
+
+  def open() {
+    val numfmt = NumberFormat.getInstance(Locale.US)
+    numfmt.setMinimumIntegerDigits(5)
+    numfmt.setGroupingUsed(false)
+
+    val outputName = "part-"  + numfmt.format(splitID)
+    val path = FileOutputFormat.getOutputPath(conf.value)
+    val fs: FileSystem = {
+      if (path != null) {
+        path.getFileSystem(conf.value)
+      } else {
+        FileSystem.get(conf.value)
+      }
+    }
+
+    getOutputCommitter().setupTask(getTaskContext())
+    writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
+  }
+
+  def write(key: AnyRef, value: AnyRef) {
+    if (writer != null) {
+      writer.write(key, value)
+    } else {
+      throw new IOException("Writer is null, open() has not been called")
+    }
+  }
+
+  def close() {
+    writer.close(Reporter.NULL)
+  }
+
+  def commit() {
+    SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
+  }
+
+  def commitJob() {
+    val cmtr = getOutputCommitter()
+    cmtr.commitJob(getJobContext())
+  }
+
+  // ********* Private Functions *********
+
+  private def getOutputFormat(): OutputFormat[AnyRef, AnyRef] = {
+    if (format == null) {
+      format = conf.value.getOutputFormat()
+        .asInstanceOf[OutputFormat[AnyRef, AnyRef]]
+    }
+    format
+  }
+
+  private def getOutputCommitter(): OutputCommitter = {
+    if (committer == null) {
+      committer = conf.value.getOutputCommitter
+    }
+    committer
+  }
+
+  private def getJobContext(): JobContext = {
+    if (jobContext == null) {
+      jobContext = new JobContextImpl(conf.value, jID.value)
+    }
+    jobContext
+  }
+
+  private def getTaskContext(): TaskAttemptContext = {
+    if (taskContext == null) {
+      taskContext = newTaskAttemptContext(conf.value, taID.value)
+    }
+    taskContext
+  }
+
+  protected def newTaskAttemptContext(
+      conf: JobConf,
+      attemptId: TaskAttemptID): TaskAttemptContext = {
+    new TaskAttemptContextImpl(conf, attemptId)
+  }
+
+  private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
+    jobID = jobid
+    splitID = splitid
+    attemptID = attemptid
+
+    jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobid))
+    taID = new SerializableWritable[TaskAttemptID](
+        new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/97cc5e5a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
new file mode 100644
index 0000000..de828a6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.util.DynamicVariable
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{JobConf, JobID}
+
+import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.OutputMetrics
+
+/**
+ * A helper object that provide common utils used during saving an RDD using a Hadoop OutputFormat
+ * (both from the old mapred API and the new mapreduce API)
+ */
+private[spark]
+object SparkHadoopWriterUtils {
+
+  private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+
+  def createJobID(time: Date, id: Int): JobID = {
+    val jobtrackerID = createJobTrackerID(time)
+    new JobID(jobtrackerID, id)
+  }
+
+  def createJobTrackerID(time: Date): String = {
+    new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
+  }
+
+  def createPathFromString(path: String, conf: JobConf): Path = {
+    if (path == null) {
+      throw new IllegalArgumentException("Output path is null")
+    }
+    val outputPath = new Path(path)
+    val fs = outputPath.getFileSystem(conf)
+    if (fs == null) {
+      throw new IllegalArgumentException("Incorrectly formatted output path")
+    }
+    outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+  }
+
+  // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
+  // setting can take effect:
+  def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = {
+    val validationDisabled = disableOutputSpecValidation.value
+    val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
+    enabledInConf && !validationDisabled
+  }
+
+  // TODO: these don't seem like the right abstractions.
+  // We should abstract the duplicate code in a less awkward way.
+
+  def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
+    val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
+    (context.taskMetrics().outputMetrics, bytesWrittenCallback)
+  }
+
+  def maybeUpdateOutputMetrics(
+      outputMetrics: OutputMetrics,
+      callback: () => Long,
+      recordsWritten: Long): Unit = {
+    if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
+      outputMetrics.setBytesWritten(callback())
+      outputMetrics.setRecordsWritten(recordsWritten)
+    }
+  }
+
+  /**
+   * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
+   * basis; see SPARK-4835 for more details.
+   */
+  val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/97cc5e5a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 52ce03f..58762cc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -37,7 +37,8 @@ import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriter,
+  SparkHadoopWriterUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer

http://git-wip-us.apache.org/repos/asf/spark/blob/97cc5e5a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 83ed127..38b9d40 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -31,6 +31,7 @@ import org.mockito.stubbing.Answer
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark._
+import org.apache.spark.internal.io.SparkHadoopWriter
 import org.apache.spark.rdd.{FakeOutputCommitter, RDD}
 import org.apache.spark.util.{ThreadUtils, Utils}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message