spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-15054] Deprecate old accumulator API
Date Mon, 02 May 2016 21:57:03 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8a1ce4899 -> d5c79f564


[SPARK-15054] Deprecate old accumulator API

## What changes were proposed in this pull request?
This patch deprecates the old accumulator API.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #12832 from rxin/SPARK-15054.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5c79f56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5c79f56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5c79f56

Branch: refs/heads/master
Commit: d5c79f564f3557037c5526e2ee5f963dd100fb34
Parents: 8a1ce48
Author: Reynold Xin <rxin@databricks.com>
Authored: Mon May 2 14:57:00 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Mon May 2 14:57:00 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulable.scala     |  6 ++++--
 .../scala/org/apache/spark/Accumulator.scala     | 19 +++++++++++--------
 .../scala/org/apache/spark/SparkContext.scala    |  5 +++++
 3 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d5c79f56/core/src/main/scala/org/apache/spark/Accumulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index c76720c..799c7e4 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark
 
-import java.io.{ObjectInputStream, Serializable}
+import java.io.Serializable
 
 import scala.collection.generic.Growable
 import scala.reflect.ClassTag
 
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.util.Utils
 
 
 /**
@@ -49,6 +48,7 @@ import org.apache.spark.util.Utils
  * @tparam R the full accumulated data (result type)
  * @tparam T partial data that can be added in
  */
+@deprecated("use AccumulatorV2", "2.0.0")
 class Accumulable[R, T] private (
     val id: Long,
     // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@@ -162,6 +162,7 @@ class Accumulable[R, T] private (
  * @tparam R the full accumulated data (result type)
  * @tparam T partial data that can be added in
  */
+@deprecated("use AccumulatorV2", "2.0.0")
 trait AccumulableParam[R, T] extends Serializable {
   /**
    * Add additional data to the accumulator value. Is allowed to modify and return `r`
@@ -191,6 +192,7 @@ trait AccumulableParam[R, T] extends Serializable {
 }
 
 
+@deprecated("use AccumulatorV2", "2.0.0")
 private[spark] class
 GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag,
T]
   extends AccumulableParam[R, T] {

http://git-wip-us.apache.org/repos/asf/spark/blob/d5c79f56/core/src/main/scala/org/apache/spark/Accumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 9b007b9..e52d36b 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -17,13 +17,6 @@
 
 package org.apache.spark
 
-import java.util.concurrent.atomic.AtomicLong
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.mutable
-import scala.ref.WeakReference
-
-import org.apache.spark.internal.Logging
 import org.apache.spark.storage.{BlockId, BlockStatus}
 
 
@@ -58,7 +51,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
  * @param name human-readable name associated with this accumulator
  * @param countFailedValues whether to accumulate values from failed tasks
  * @tparam T result type
- */
+*/
+@deprecated("use AccumulatorV2", "2.0.0")
 class Accumulator[T] private[spark] (
     // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
     @transient private val initialValue: T,
@@ -75,6 +69,7 @@ class Accumulator[T] private[spark] (
  *
  * @tparam T type of value to accumulate
  */
+@deprecated("use AccumulatorV2", "2.0.0")
 trait AccumulatorParam[T] extends AccumulableParam[T, T] {
   def addAccumulator(t1: T, t2: T): T = {
     addInPlace(t1, t2)
@@ -82,6 +77,7 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
 }
 
 
+@deprecated("use AccumulatorV2", "2.0.0")
 object AccumulatorParam {
 
   // The following implicit objects were in SparkContext before 1.2 and users had to
@@ -89,21 +85,25 @@ object AccumulatorParam {
   // them automatically. However, as there are duplicate codes in SparkContext for backward
   // compatibility, please update them accordingly if you modify the following implicit objects.
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
     def zero(initialValue: Double): Double = 0.0
   }
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
     def addInPlace(t1: Int, t2: Int): Int = t1 + t2
     def zero(initialValue: Int): Int = 0
   }
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
     def addInPlace(t1: Long, t2: Long): Long = t1 + t2
     def zero(initialValue: Long): Long = 0L
   }
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
     def addInPlace(t1: Float, t2: Float): Float = t1 + t2
     def zero(initialValue: Float): Float = 0f
@@ -112,6 +112,7 @@ object AccumulatorParam {
   // Note: when merging values, this param just adopts the newer value. This is used only
   // internally for things that shouldn't really be accumulated across tasks, like input
   // read method, which should be the same across all tasks in the same stage.
+  @deprecated("use AccumulatorV2", "2.0.0")
   private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
     def addInPlace(t1: String, t2: String): String = t2
     def zero(initialValue: String): String = ""
@@ -119,12 +120,14 @@ object AccumulatorParam {
 
   // Note: this is expensive as it makes a copy of the list every time the caller adds an
item.
   // A better way to use this is to first accumulate the values yourself then them all at
once.
+  @deprecated("use AccumulatorV2", "2.0.0")
   private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
     def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
     def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
   }
 
   // For the internal metric that records what blocks are updated in a particular task
+  @deprecated("use AccumulatorV2", "2.0.0")
   private[spark] object UpdatedBlockStatusesAccumulatorParam
     extends ListAccumulatorParam[(BlockId, BlockStatus)]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d5c79f56/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d0f88d4..302dec2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1219,6 +1219,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can
"add"
    * values to using the `+=` method. Only the driver can access the accumulator's `value`.
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T]
= {
     val acc = new Accumulator(initialValue, param)
     cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
@@ -1230,6 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only
the
    * driver can access the accumulator's `value`.
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
     : Accumulator[T] = {
     val acc = new Accumulator(initialValue, param, Some(name))
@@ -1243,6 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T])
     : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param)
@@ -1257,6 +1260,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R,
T])
     : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param, Some(name))
@@ -1270,6 +1274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented
by
    * standard mutable collections. So you can use this with mutable Map, Set, etc.
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable:
ClassTag, T]
       (initialValue: R): Accumulable[R, T] = {
     val param = new GrowableAccumulableParam[R, T]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message