spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "holdenk (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-14654) New accumulator API
Date Mon, 25 Apr 2016 03:32:13 GMT

    [ https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15255840#comment-15255840
] 

holdenk commented on SPARK-14654:
---------------------------------

Ah yes, in DAGScheduler right now we cast it to [Any, Any] when we get it from the map so
I suppose the type safety would be lost and we just hide the cast from the developer which
probably isn't worth it.

> New accumulator API
> -------------------
>
>                 Key: SPARK-14654
>                 URL: https://issues.apache.org/jira/browse/SPARK-14654
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, AccumulatorParam,
AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there is no way
to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive boxing and making
accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be different from
the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
>     // Automatically register the accumulator when it is deserialized with the task closure.
>     // This is for external accumulators and internal ones that do not represent task
level
>     // metrics, e.g. internal SQL metrics, which are per-operator.
>     val taskContext = TaskContext.get()
>     if (taskContext != null) {
>       taskContext.registerAccumulator(this)
>     }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
>     in.defaultReadObject()
>     registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
>     _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other match {
>     case o: LongAccumulator => _sum += o.sum
>     case _ => throw new UnsupportedOperationException(
>       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message