spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject git commit: [SPARK-4080] Only throw IOException from [write|read][Object|External]
Date Fri, 24 Oct 2014 22:22:08 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 59297e951 -> 6c10c2770


[SPARK-4080] Only throw IOException from [write|read][Object|External]

If classes implementing Serializable or Externalizable interfaces throw
exceptions other than IOException or ClassNotFoundException from their
(de)serialization methods, then this results in an unhelpful
"IOException: unexpected exception type" rather than the actual exception that
produced the (de)serialization error.

This patch fixes this by adding a utility method that re-wraps any uncaught
exceptions in IOException (unless they are already instances of IOException).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #2932 from JoshRosen/SPARK-4080 and squashes the following commits:

cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External].

(cherry picked from commit 6c98c29ae0033556fd4424f41d1de005c509e511)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
	core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
	streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c10c277
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c10c277
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c10c277

Branch: refs/heads/branch-1.1
Commit: 6c10c2770c718287f9cc2af4109b701fa1057b70
Parents: 59297e9
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Fri Oct 24 15:06:15 2014 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Fri Oct 24 15:21:08 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/Accumulators.scala   |  3 ++-
 .../main/scala/org/apache/spark/Partitioner.scala    |  4 ++--
 .../org/apache/spark/SerializableWritable.scala      |  5 +++--
 .../org/apache/spark/broadcast/HttpBroadcast.scala   |  4 ++--
 .../apache/spark/broadcast/TorrentBroadcast.scala    |  6 +++---
 .../apache/spark/deploy/master/ApplicationInfo.scala |  3 ++-
 .../org/apache/spark/deploy/master/DriverInfo.scala  |  3 ++-
 .../org/apache/spark/deploy/master/WorkerInfo.scala  |  2 +-
 .../scala/org/apache/spark/rdd/CartesianRDD.scala    |  3 ++-
 .../scala/org/apache/spark/rdd/CoGroupedRDD.scala    |  3 ++-
 .../scala/org/apache/spark/rdd/CoalescedRDD.scala    |  3 ++-
 .../org/apache/spark/rdd/ParallelCollectionRDD.scala |  4 ++--
 .../apache/spark/rdd/PartitionerAwareUnionRDD.scala  |  3 ++-
 .../main/scala/org/apache/spark/rdd/UnionRDD.scala   |  3 ++-
 .../org/apache/spark/rdd/ZippedPartitionsRDD.scala   |  3 ++-
 .../scala/org/apache/spark/scheduler/MapStatus.scala |  3 ++-
 .../org/apache/spark/scheduler/TaskResult.scala      |  4 ++--
 .../org/apache/spark/serializer/JavaSerializer.scala |  4 ++--
 .../org/apache/spark/storage/BlockManagerId.scala    |  4 ++--
 .../apache/spark/storage/BlockManagerMessages.scala  |  6 ++++--
 .../org/apache/spark/storage/StorageLevel.scala      |  5 +++--
 .../org/apache/spark/util/SerializableBuffer.scala   |  4 ++--
 .../src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++
 .../spark/streaming/flume/FlumeInputDStream.scala    |  4 ++--
 .../org/apache/spark/streaming/DStreamGraph.scala    |  5 +++--
 .../org/apache/spark/streaming/dstream/DStream.scala |  6 +++---
 .../streaming/dstream/DStreamCheckpointData.scala    |  5 +++--
 .../spark/streaming/dstream/FileInputDStream.scala   |  4 ++--
 .../org/apache/spark/streaming/TestSuiteBase.scala   |  4 ++--
 29 files changed, 78 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 12f2fe0..2301caa 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.Map
 import scala.reflect.ClassTag
 
 import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.Utils
 
 /**
  * A data type that can be accumulated, ie has an commutative and associative "add" operation,
@@ -126,7 +127,7 @@ class Accumulable[R, T] (
   }
 
   // Called by Java when deserializing an object
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     value_ = zero
     deserialized = true

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 37053bb..e53a78e 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   }
 
   @throws(classOf[IOException])
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     val sfactory = SparkEnv.get.serializer
     sfactory match {
       case js: JavaSerializer => out.defaultWriteObject()
@@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   }
 
   @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     val sfactory = SparkEnv.get.serializer
     sfactory match {
       case js: JavaSerializer => in.defaultReadObject()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/SerializableWritable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index e50b9ac..55cb259 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
 import org.apache.hadoop.io.Writable
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
 
 @DeveloperApi
 class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
   def value = t
   override def toString = t.toString
 
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     out.defaultWriteObject()
     new ObjectWritable(t).write(out)
   }
 
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     val ow = new ObjectWritable()
     ow.setConf(new Configuration())

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 4cd4f4f..7dade04 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag](
   }
 
   /** Used by the JVM when serializing this object. */
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     assertValid()
     out.defaultWriteObject()
   }
 
   /** Used by the JVM when deserializing this object. */
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     HttpBroadcast.synchronized {
       SparkEnv.get.blockManager.getSingle(blockId) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 6173fd3..afa3d93 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -27,7 +27,7 @@ import scala.util.Random
 import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
-import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 
 /**
  * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
@@ -146,13 +146,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](
   }
 
   /** Used by the JVM when serializing this object. */
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     assertValid()
     out.defaultWriteObject()
   }
 
   /** Used by the JVM when deserializing this object. */
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     TorrentBroadcast.synchronized {
       SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index c3ca43f..6ba395b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import akka.actor.ActorRef
 
 import org.apache.spark.deploy.ApplicationDescription
+import org.apache.spark.util.Utils
 
 private[spark] class ApplicationInfo(
     val startTime: Long,
@@ -46,7 +47,7 @@ private[spark] class ApplicationInfo(
 
   init()
 
-  private def readObject(in: java.io.ObjectInputStream): Unit = {
+  private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     init()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
index 80b570a..2ac2118 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
 import java.util.Date
 
 import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.util.Utils
 
 private[spark] class DriverInfo(
     val startTime: Long,
@@ -36,7 +37,7 @@ private[spark] class DriverInfo(
 
   init()
 
-  private def readObject(in: java.io.ObjectInputStream): Unit = {
+  private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     init()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index c5fa9cf..d221b0f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -50,7 +50,7 @@ private[spark] class WorkerInfo(
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
-  private def readObject(in: java.io.ObjectInputStream) : Unit = {
+  private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     init()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 4908711..1cbd684 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
 import scala.reflect.ClassTag
 
 import org.apache.spark._
+import org.apache.spark.util.Utils
 
 private[spark]
 class CartesianPartition(
@@ -36,7 +37,7 @@ class CartesianPartition(
   override val index: Int = idx
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     s1 = rdd1.partitions(s1Index)
     s2 = rdd2.partitions(s2Index)

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index fabb882..ffc0a8a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner,
SparkEnv
 import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
+import org.apache.spark.util.Utils
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleHandle
 
@@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep(
   ) extends CoGroupSplitDep {
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     split = rdd.partitions(splitIndex)
     oos.defaultWriteObject()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 11ebafb..9fab1d7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -25,6 +25,7 @@ import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.apache.spark._
+import org.apache.spark.util.Utils
 
 /**
  * Class that captures a coalesced RDD by essentially keeping track of parent partitions
@@ -42,7 +43,7 @@ private[spark] case class CoalescedRDDPartition(
   var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent partition at the time of task serialization
     parents = parentsIndices.map(rdd.partitions(_))
     oos.defaultWriteObject()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 66c71bf..87b22de 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -48,7 +48,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
   override def index: Int = slice
 
   @throws(classOf[IOException])
-  private def writeObject(out: ObjectOutputStream): Unit = {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
 
     val sfactory = SparkEnv.get.serializer
 
@@ -67,7 +67,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
   }
 
   @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
 
     val sfactory = SparkEnv.get.serializer
     sfactory match {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 0c2cd7a..92b0641 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
 import scala.reflect.ClassTag
 
 import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.util.Utils
 
 /**
  * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
@@ -38,7 +39,7 @@ class PartitionerAwareUnionRDDPartition(
   override def hashCode(): Int = idx
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent partition at the time of task serialization
     parents = rdds.map(_.partitions(index)).toArray
     oos.defaultWriteObject()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 197167e..3298cbc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
 
 /**
  * Partition for UnionRDD.
@@ -48,7 +49,7 @@ private[spark] class UnionPartition[T: ClassTag](
   override val index: Int = idx
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     parentPartition = rdd.partitions(parentRddPartitionIndex)
     oos.defaultWriteObject()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index f3d30f6..996f2cd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
 import scala.reflect.ClassTag
 
 import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.util.Utils
 
 private[spark] class ZippedPartitionsPartition(
     idx: Int,
@@ -34,7 +35,7 @@ private[spark] class ZippedPartitionsPartition(
   def partitions = partitionValues
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     partitionValues = rdds.map(rdd => rdd.partitions(idx))
     oos.defaultWriteObject()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d3f63ff..cb9492a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.Utils
 
 /**
  * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address
that the
@@ -31,7 +32,7 @@ private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes
 
   def this() = this(null, null)  // For deserialization only
 
-  def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     location.writeExternal(out)
     out.writeInt(compressedSizes.length)
     out.write(compressedSizes)

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index d49d8fb..11c19ee 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -42,7 +42,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates:
Map[Long
 
   def this() = this(null.asInstanceOf[ByteBuffer], null, null)
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
 
     out.writeInt(valueBytes.remaining);
     Utils.writeByteBuffer(valueBytes, out)
@@ -55,7 +55,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates:
Map[Long
     out.writeObject(metrics)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
 
     val blen = in.readInt()
     val byteVal = new Array[Byte](blen)

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 554a33c..662a7b9 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -117,11 +117,11 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable
{
     new JavaSerializerInstance(counterReset, classLoader)
   }
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     out.writeInt(counterReset)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     counterReset = in.readInt()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index b1585bd..ffd2a4d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -62,14 +62,14 @@ class BlockManagerId private (
 
   def nettyPort: Int = nettyPort_
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     out.writeUTF(executorId_)
     out.writeUTF(host_)
     out.writeInt(port_)
     out.writeInt(nettyPort_)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     executorId_ = in.readUTF()
     host_ = in.readUTF()
     port_ = in.readInt()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 2ba16b8..03ba898 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -21,6 +21,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
 import akka.actor.ActorRef
 
+import org.apache.spark.util.Utils
+
 private[spark] object BlockManagerMessages {
   //////////////////////////////////////////////////////////////////////////////////
   // Messages from the master to slaves.
@@ -65,7 +67,7 @@ private[spark] object BlockManagerMessages {
 
     def this() = this(null, null, null, 0, 0, 0)  // For deserialization only
 
-    override def writeExternal(out: ObjectOutput) {
+    override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
       blockManagerId.writeExternal(out)
       out.writeUTF(blockId.name)
       storageLevel.writeExternal(out)
@@ -74,7 +76,7 @@ private[spark] object BlockManagerMessages {
       out.writeLong(tachyonSize)
     }
 
-    override def readExternal(in: ObjectInput) {
+    override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
       blockManagerId = BlockManagerId(in)
       blockId = BlockId(in.readUTF())
       storageLevel = StorageLevel(in)

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 1e35aba..56edc4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -97,12 +98,12 @@ class StorageLevel private(
     ret
   }
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     out.writeByte(toInt)
     out.writeByte(_replication)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     val flags = in.readByte()
     _useDisk = (flags & 8) != 0
     _useMemory = (flags & 4) != 0

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
index 2b452ad..770ff9d 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
@@ -29,7 +29,7 @@ private[spark]
 class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
   def value = buffer
 
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     val length = in.readInt()
     buffer = ByteBuffer.allocate(length)
     var amountRead = 0
@@ -44,7 +44,7 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable
     buffer.rewind() // Allow us to read it later
   }
 
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     out.writeInt(buffer.limit())
     if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
       throw new IOException("Could not fully write buffer to output stream")

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0b52d72..aefbb24 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -916,6 +916,21 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
+   * exceptions as IOException.  This is used when implementing Externalizable and Serializable's
+   * read and write methods, since Java's serializer will not report non-IOExceptions properly;
+   * see SPARK-4080 for more context.
+   */
+  def tryOrIOException(block: => Unit) {
+    try {
+      block
+    } catch {
+      case e: IOException => throw e
+      case NonFatal(t) => throw new IOException(t)
+    }
+  }
+
   /** Default filtering function for finding call sites using `getCallSite`. */
   private def coreExclusionFunction(className: String): Boolean = {
     // A regular expression to match classes of the "core" Spark API that we want to skip
when

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 4b2ea45..2de2a79 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -66,7 +66,7 @@ class SparkFlumeEvent() extends Externalizable {
   var event : AvroFlumeEvent = new AvroFlumeEvent()
 
   /* De-serialize from bytes. */
-  def readExternal(in: ObjectInput) {
+  def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     val bodyLength = in.readInt()
     val bodyBuff = new Array[Byte](bodyLength)
     in.readFully(bodyBuff)
@@ -93,7 +93,7 @@ class SparkFlumeEvent() extends Externalizable {
   }
 
   /* Serialize to bytes. */
-  def writeExternal(out: ObjectOutput) {
+  def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     val body = event.getBody.array()
     out.writeInt(body.length)
     out.write(body)

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index b4adf0e..e59c24a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 import org.apache.spark.Logging
 import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream}
+import org.apache.spark.util.Utils
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
 
@@ -160,7 +161,7 @@ final private[streaming] class DStreamGraph extends Serializable with
Logging {
   }
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     logDebug("DStreamGraph.writeObject used")
     this.synchronized {
       checkpointInProgress = true
@@ -172,7 +173,7 @@ final private[streaming] class DStreamGraph extends Serializable with
Logging {
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug("DStreamGraph.readObject used")
     this.synchronized {
       checkpointInProgress = true

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 65f7ccd..eabd61d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.util.{CallSite, MetadataCleaner}
+import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -400,7 +400,7 @@ abstract class DStream[T: ClassTag] (
   }
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".writeObject used")
     if (graph != null) {
       graph.synchronized {
@@ -423,7 +423,7 @@ abstract class DStream[T: ClassTag] (
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new HashMap[Time, RDD[T]] ()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index f33c0ce..0dc7279 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.fs.FileSystem
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
 
 private[streaming]
 class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
@@ -119,7 +120,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
   }
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".writeObject used")
     if (dstream.context.graph != null) {
       dstream.context.graph.synchronized {
@@ -142,7 +143,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     timeToOldestCheckpointFileTime = new HashMap[Time, Time]

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 9eecbfa..8152b75 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.UnionRDD
 import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.util.TimeStampedHashMap
+import org.apache.spark.util.{TimeStampedHashMap, Utils}
 
 
 private[streaming]
@@ -151,7 +151,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V]
: Clas
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c10c277/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 759baac..c65e83c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -73,7 +73,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     ois.defaultReadObject()
     output.clear()
   }
@@ -95,7 +95,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     ois.defaultReadObject()
     output.clear()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message