spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [HOTFIX][CORE] fix a concurrence issue in NewAccumulator
Date Fri, 29 Apr 2016 04:58:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9c7c42bc6 -> 6f9a18fe3


[HOTFIX][CORE] fix a concurrence issue in NewAccumulator

## What changes were proposed in this pull request?

`AccumulatorContext` is not thread-safe, that's why all of its methods are synchronized. However,
there is one exception: the `AccumulatorContext.originals`. `NewAccumulator` use it to check
if it's registered, which is wrong as it's not synchronized.

This PR mark `AccumulatorContext.originals` as `private` and now all access to `AccumulatorContext`
is synchronized.

## How was this patch tested?

I verified it locally. To be safe, we can let jenkins test it many times to make sure this
problem is gone.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12773 from cloud-fan/debug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f9a18fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f9a18fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f9a18fe

Branch: refs/heads/master
Commit: 6f9a18fe311925056cce83a44f187f122b6591cb
Parents: 9c7c42b
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Thu Apr 28 21:57:58 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Thu Apr 28 21:57:58 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/NewAccumulator.scala | 10 ++++++++--
 .../test/scala/org/apache/spark/AccumulatorSuite.scala    |  2 +-
 .../scala/org/apache/spark/InternalAccumulatorSuite.scala |  6 +++---
 .../scala/org/apache/spark/sql/CachedTableSuite.scala     |  4 ++--
 4 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f9a18fe/core/src/main/scala/org/apache/spark/NewAccumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
index edb9b74..aa21ccc 100644
--- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
@@ -22,6 +22,8 @@ import java.io.ObjectInputStream
 import java.util.concurrent.atomic.AtomicLong
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.util.Utils
 
@@ -57,7 +59,7 @@ abstract class NewAccumulator[IN, OUT] extends Serializable {
    * registered before ues, or it will throw exception.
    */
   final def isRegistered: Boolean =
-    metadata != null && AccumulatorContext.originals.containsKey(metadata.id)
+    metadata != null && AccumulatorContext.get(metadata.id).isDefined
 
   private def assertMetadataNotNull(): Unit = {
     if (metadata == null) {
@@ -197,7 +199,7 @@ private[spark] object AccumulatorContext {
    * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
    */
   @GuardedBy("AccumulatorContext")
-  val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_, _]]]
+  private val originals = new java.util.HashMap[Long, jl.ref.WeakReference[NewAccumulator[_,
_]]]
 
   private[this] val nextId = new AtomicLong(0L)
 
@@ -207,6 +209,10 @@ private[spark] object AccumulatorContext {
    */
   def newId(): Long = nextId.getAndIncrement
 
+  def numAccums: Int = synchronized(originals.size)
+
+  def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet)
+
   /**
    * Register an [[Accumulator]] created on the driver such that it can be used on the executors.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/6f9a18fe/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 5f97e58..9c90049 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -191,7 +191,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
     assert(ref.get.isEmpty)
 
     AccumulatorContext.remove(accId)
-    assert(!AccumulatorContext.originals.containsKey(accId))
+    assert(!AccumulatorContext.get(accId).isDefined)
   }
 
   test("get accum") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6f9a18fe/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index e4474bb..972e31c 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext
{
       private val myCleaner = new SaveAccumContextCleaner(this)
       override def cleaner: Option[ContextCleaner] = Some(myCleaner)
     }
-    assert(AccumulatorContext.originals.isEmpty)
+    assert(AccumulatorContext.numAccums == 0)
     sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
     val numInternalAccums = TaskMetrics.empty.internalAccums.length
     // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
-    assert(AccumulatorContext.originals.size === numInternalAccums * 2)
+    assert(AccumulatorContext.numAccums === numInternalAccums * 2)
     val accumsRegistered = sc.cleaner match {
       case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
       case _ => Seq.empty[Long]
     }
     // Make sure the same set of accumulators is registered for cleanup
     assert(accumsRegistered.size === numInternalAccums * 2)
-    assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala)
+    assert(accumsRegistered.toSet === AccumulatorContext.accumIds)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6f9a18fe/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0e6356b..1095a73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -334,10 +334,10 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     sql("SELECT * FROM t2").count()
 
     AccumulatorContext.synchronized {
-      val accsSize = AccumulatorContext.originals.size
+      val accsSize = AccumulatorContext.numAccums
       sqlContext.uncacheTable("t1")
       sqlContext.uncacheTable("t2")
-      assert((accsSize - 2) == AccumulatorContext.originals.size)
+      assert((accsSize - 2) == AccumulatorContext.numAccums)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message