spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [1/4] git commit: Handle ConcurrentModificationExceptions in SparkContext init.
Date Thu, 31 Oct 2013 03:11:58 GMT
Updated Branches:
  refs/heads/master dc9ce16f6 -> 8f1098a3f


Handle ConcurrentModificationExceptions in SparkContext init.

System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.

Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a6ae2b48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a6ae2b48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a6ae2b48

Branch: refs/heads/master
Commit: a6ae2b48320d367be5fede60687331ce0d563d00
Parents: 1dc776b
Author: Stephen Haberman <stephen@exigencecorp.com>
Authored: Sun Oct 27 13:35:04 2013 -0500
Committer: Stephen Haberman <stephen@exigencecorp.com>
Committed: Sun Oct 27 14:08:32 2013 -0500

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala |  6 +++---
 core/src/main/scala/org/apache/spark/util/Utils.scala   | 10 ++++++++++
 2 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a6ae2b48/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 564466c..d694dfe 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.Map
 import scala.collection.generic.Growable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 
@@ -248,8 +248,8 @@ class SparkContext(
       conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
     }
     // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
-    for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop."))
{
-      conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
+    Utils.getSystemProperties.foreach { case (key, value) if key.startsWith("spark.hadoop.")
=>
+      conf.set(key.substring("spark.hadoop.".length), value)
     }
     val bufferSize = System.getProperty("spark.buffer.size", "65536")
     conf.set("io.file.buffer.size", bufferSize)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a6ae2b48/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a3b3968..d637a0a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
 import org.apache.spark.deploy.SparkHadoopUtil
 import java.nio.ByteBuffer
 import org.apache.spark.{SparkEnv, SparkException, Logging}
+import java.util.ConcurrentModificationException
 
 
 /**
@@ -819,4 +820,13 @@ private[spark] object Utils extends Logging {
     // Nothing else to guard against ?
     hashAbs
   }
+
+  /** Returns a copy of the system properties that is thread-safe to iterator over. */
+  def getSystemProperties(): Map[String, String] = {
+    try {
+      return System.getProperties().toMap[String, String]
+    } catch {
+      case e: ConcurrentModificationException => getSystemProperties()
+    }
+  }
 }


Mime
View raw message