spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "SuYan (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-6606) Accumulator deserialized twice because the NarrowCoGroupSplitDep contains rdd object.
Date Mon, 30 Mar 2015 08:06:53 GMT
SuYan created SPARK-6606:
----------------------------

             Summary: Accumulator deserialized twice because the NarrowCoGroupSplitDep contains
rdd object.
                 Key: SPARK-6606
                 URL: https://issues.apache.org/jira/browse/SPARK-6606
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.3.0, 1.2.0
            Reporter: SuYan


1. Use code like belows, will found accumulator deserialized twice.
first: 
{code}
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
{code}
second:
{code}
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
{code}
which the first deserialized is not what expected.

because ResultTask or ShuffleMapTask will have a partition object.
in class 
{code}
CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
{code}, the CogroupPartition may contains a  CoGroupDep:
{code}
NarrowCoGroupSplitDep(
    rdd: RDD[_],
    splitIndex: Int,
    var split: Partition
  ) extends CoGroupSplitDep {
{code}

in that *NarrowCoGroupSplitDep*, it will bring into rdd object, which result into the first
deserialized.

example:
{code}
   val acc1 = sc.accumulator(0, "test1")
    val acc2 = sc.accumulator(0, "test2")
    val rdd1 = sc.parallelize((1 to 10).toSeq, 3)
    val rdd2 = sc.parallelize((1 to 10).toSeq, 3)
    val combine1 = rdd1.map { case a => (a, 1)}.combineByKey(a => {
      acc1 += 1
      a
    }, (a: Int, b: Int) => {
      a + b
    },
      (a: Int, b: Int) => {
        a + b
      }, new HashPartitioner(3), mapSideCombine = false)

    val combine2 = rdd2.map { case a => (a, 1)}.combineByKey(
      a => {
        acc2 += 1
        a
      },
      (a: Int, b: Int) => {
        a + b
      },
      (a: Int, b: Int) => {
        a + b
      }, new HashPartitioner(3), mapSideCombine = false)

    combine1.cogroup(combine2, new HashPartitioner(3)).count()
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message