spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewor14 <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...
Date Thu, 15 Oct 2015 21:24:29 GMT
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9124#discussion_r42182568
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
---
    @@ -18,535 +18,91 @@
     package org.apache.spark.util.collection
     
     import scala.collection.mutable.ArrayBuffer
    -
     import scala.util.Random
     
     import org.apache.spark._
     import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
     
    -// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
     
     class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
    -  private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
    -    val conf = new SparkConf(loadDefaults)
    -    if (kryo) {
    -      conf.set("spark.serializer", classOf[KryoSerializer].getName)
    -    } else {
    -      // Make the Java serializer write a reset instruction (TC_RESET) after each object
to test
    -      // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    -      conf.set("spark.serializer.objectStreamReset", "1")
    -      conf.set("spark.serializer", classOf[JavaSerializer].getName)
    -    }
    -    conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
    -    // Ensure that we actually have multiple batches per spill file
    -    conf.set("spark.shuffle.spill.batchSize", "10")
    -    conf.set("spark.testing.memory", "2000000")
    -    conf
    -  }
    -
    -  test("empty data stream with kryo ser") {
    -    emptyDataStream(createSparkConf(false, true))
    -  }
    -
    -  test("empty data stream with java ser") {
    -    emptyDataStream(createSparkConf(false, false))
    -  }
    -
    -  def emptyDataStream(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) =>
i + j)
    -    val ord = implicitly[Ordering[Int]]
    -
    -    // Both aggregator and ordering
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
    -    assert(sorter.iterator.toSeq === Seq())
    -    sorter.stop()
    -
    -    // Only aggregator
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(3)), None, None)
    -    assert(sorter2.iterator.toSeq === Seq())
    -    sorter2.stop()
    -
    -    // Only ordering
    -    val sorter3 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    assert(sorter3.iterator.toSeq === Seq())
    -    sorter3.stop()
    -
    -    // Neither aggregator nor ordering
    -    val sorter4 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), None, None)
    -    assert(sorter4.iterator.toSeq === Seq())
    -    sorter4.stop()
    -  }
    +  import TestUtils.{assertNotSpilled, assertSpilled}
     
    -  test("few elements per partition with kryo ser") {
    -    fewElementsPerPartition(createSparkConf(false, true))
    -  }
    +  testWithMultipleSer("empty data stream")(emptyDataStream)
     
    -  test("few elements per partition with java ser") {
    -    fewElementsPerPartition(createSparkConf(false, false))
    -  }
    +  testWithMultipleSer("few elements per partition")(fewElementsPerPartition)
     
    -  def fewElementsPerPartition(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) =>
i + j)
    -    val ord = implicitly[Ordering[Int]]
    -    val elements = Set((1, 1), (2, 2), (5, 5))
    -    val expected = Set(
    -      (0, Set()), (1, Set((1, 1))), (2, Set((2, 2))), (3, Set()), (4, Set()),
    -      (5, Set((5, 5))), (6, Set()))
    -
    -    // Both aggregator and ordering
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter.insertAll(elements.iterator)
    -    assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter.stop()
    -
    -    // Only aggregator
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(7)), None, None)
    -    sorter2.insertAll(elements.iterator)
    -    assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter2.stop()
    +  testWithMultipleSer("empty partitions with spilling")(emptyPartitionsWithSpilling)
     
    -    // Only ordering
    -    val sorter3 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter3.insertAll(elements.iterator)
    -    assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter3.stop()
    -
    -    // Neither aggregator nor ordering
    -    val sorter4 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), None, None)
    -    sorter4.insertAll(elements.iterator)
    -    assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter4.stop()
    -  }
    -
    -  test("empty partitions with spilling with kryo ser") {
    -    emptyPartitionsWithSpilling(createSparkConf(false, true))
    +  // Load defaults, otherwise SPARK_HOME is not found
    +  testWithMultipleSer("spilling in local cluster", loadDefaults = true) {
    +    (conf: SparkConf) => testSpillingInLocalCluster(conf, 2)
       }
     
    -  test("empty partitions with spilling with java ser") {
    -    emptyPartitionsWithSpilling(createSparkConf(false, false))
    -  }
    -
    -  def emptyPartitionsWithSpilling(conf: SparkConf) {
    -    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x =>
(2, 2))
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter.insertAll(elements)
    -    assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make
sure it spilled
    -    val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
    -    assert(iter.next() === (0, Nil))
    -    assert(iter.next() === (1, List((1, 1))))
    -    assert(iter.next() === (2, (0 until 100000).map(x => (2, 2)).toList))
    -    assert(iter.next() === (3, Nil))
    -    assert(iter.next() === (4, Nil))
    -    assert(iter.next() === (5, List((5, 5))))
    -    assert(iter.next() === (6, Nil))
    -    sorter.stop()
    -  }
    -
    -  test("spilling in local cluster with kryo ser") {
    -    // Load defaults, otherwise SPARK_HOME is not found
    -    testSpillingInLocalCluster(createSparkConf(true, true))
    -  }
    -
    -  test("spilling in local cluster with java ser") {
    -    // Load defaults, otherwise SPARK_HOME is not found
    -    testSpillingInLocalCluster(createSparkConf(true, false))
    -  }
    -
    -  def testSpillingInLocalCluster(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
    -
    -    // reduceByKey - should spill ~8 times
    -    val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
    -    val resultA = rddA.reduceByKey(math.max).collect()
    -    assert(resultA.length == 50000)
    -    resultA.foreach { case(k, v) =>
    -      if (v != k * 2 + 1) {
    -        fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
    -      }
    -    }
    -
    -    // groupByKey - should spill ~17 times
    -    val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultB = rddB.groupByKey().collect()
    -    assert(resultB.length == 25000)
    -    resultB.foreach { case(i, seq) =>
    -      val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
    -      if (seq.toSet != expected) {
    -        fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
    -      }
    -    }
    -
    -    // cogroup - should spill ~7 times
    -    val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
    -    val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
    -    val resultC = rddC1.cogroup(rddC2).collect()
    -    assert(resultC.length == 10000)
    -    resultC.foreach { case(i, (seq1, seq2)) =>
    -      i match {
    -        case 0 =>
    -          assert(seq1.toSet == Set[Int](0))
    -          assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000,
8000, 9000))
    -        case 1 =>
    -          assert(seq1.toSet == Set[Int](1))
    -          assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001,
8001, 9001))
    -        case 5000 =>
    -          assert(seq1.toSet == Set[Int](5000))
    -          assert(seq2.toSet == Set[Int]())
    -        case 9999 =>
    -          assert(seq1.toSet == Set[Int](9999))
    -          assert(seq2.toSet == Set[Int]())
    -        case _ =>
    -      }
    -    }
    -
    -    // larger cogroup - should spill ~7 times
    -    val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val resultD = rddD1.cogroup(rddD2).collect()
    -    assert(resultD.length == 5000)
    -    resultD.foreach { case(i, (seq1, seq2)) =>
    -      val expected = Set(i * 2, i * 2 + 1)
    -      if (seq1.toSet != expected) {
    -        fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
    -      }
    -      if (seq2.toSet != expected) {
    -        fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
    -      }
    -    }
    -
    -    // sortByKey - should spill ~17 times
    -    val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultE = rddE.sortByKey().collect().toSeq
    -    assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
    -  }
    -
    -  test("spilling in local cluster with many reduce tasks with kryo ser") {
    -    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, true))
    -  }
    -
    -  test("spilling in local cluster with many reduce tasks with java ser") {
    -    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, false))
    -  }
    -
    -  def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
    -
    -    // reduceByKey - should spill ~4 times per executor
    -    val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
    -    val resultA = rddA.reduceByKey(math.max _, 100).collect()
    -    assert(resultA.length == 50000)
    -    resultA.foreach { case(k, v) =>
    -      if (v != k * 2 + 1) {
    -        fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
    -      }
    -    }
    -
    -    // groupByKey - should spill ~8 times per executor
    -    val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultB = rddB.groupByKey(100).collect()
    -    assert(resultB.length == 25000)
    -    resultB.foreach { case(i, seq) =>
    -      val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
    -      if (seq.toSet != expected) {
    -        fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
    -      }
    -    }
    -
    -    // cogroup - should spill ~4 times per executor
    -    val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
    -    val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
    -    val resultC = rddC1.cogroup(rddC2, 100).collect()
    -    assert(resultC.length == 10000)
    -    resultC.foreach { case(i, (seq1, seq2)) =>
    -      i match {
    -        case 0 =>
    -          assert(seq1.toSet == Set[Int](0))
    -          assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000,
8000, 9000))
    -        case 1 =>
    -          assert(seq1.toSet == Set[Int](1))
    -          assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001,
8001, 9001))
    -        case 5000 =>
    -          assert(seq1.toSet == Set[Int](5000))
    -          assert(seq2.toSet == Set[Int]())
    -        case 9999 =>
    -          assert(seq1.toSet == Set[Int](9999))
    -          assert(seq2.toSet == Set[Int]())
    -        case _ =>
    -      }
    -    }
    -
    -    // larger cogroup - should spill ~4 times per executor
    -    val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val resultD = rddD1.cogroup(rddD2).collect()
    -    assert(resultD.length == 5000)
    -    resultD.foreach { case(i, (seq1, seq2)) =>
    -      val expected = Set(i * 2, i * 2 + 1)
    -      if (seq1.toSet != expected) {
    -        fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
    -      }
    -      if (seq2.toSet != expected) {
    -        fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
    -      }
    -    }
    -
    -    // sortByKey - should spill ~8 times per executor
    -    val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultE = rddE.sortByKey().collect().toSeq
    -    assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
    +  testWithMultipleSer("spilling in local cluster with many reduce tasks", loadDefaults
= true) {
    +    (conf: SparkConf) => testSpillingInLocalCluster(conf, 100)
       }
     
       test("cleanup of intermediate files in sorter") {
    -    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is
not found
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val ord = implicitly[Ordering[Int]]
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 120000).iterator.map(i => (i, i)))
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    sorter.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    -
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)))
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet)
    -    sorter2.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    +    cleanupIntermediateFilesInSorter(withFailures = false)
       }
     
    -  test("cleanup of intermediate files in sorter if there are errors") {
    -    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is
not found
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val ord = implicitly[Ordering[Int]]
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    intercept[SparkException] {
    -      sorter.insertAll((0 until 120000).iterator.map(i => {
    -        if (i == 119990) {
    -          throw new SparkException("Intentional failure")
    -        }
    -        (i, i)
    -      }))
    -    }
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    sorter.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    +  test("cleanup of intermediate files in sorter with failures") {
    +    cleanupIntermediateFilesInSorter(withFailures = true)
       }
     
       test("cleanup of intermediate files in shuffle") {
    -    val conf = createSparkConf(false, false)
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val data = sc.parallelize(0 until 100000, 2).map(i => (i, i))
    -    assert(data.reduceByKey(_ + _).count() === 100000)
    -
    -    // After the shuffle, there should be only 4 files on disk: our two map output files
and
    -    // their index files. All other intermediate files should've been deleted.
    -    assert(diskBlockManager.getAllFiles().length === 4)
    -  }
    -
    -  test("cleanup of intermediate files in shuffle with errors") {
    -    val conf = createSparkConf(false, false)
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val data = sc.parallelize(0 until 100000, 2).map(i => {
    -      if (i == 99990) {
    -        throw new Exception("Intentional failure")
    -      }
    -      (i, i)
    -    })
    -    intercept[SparkException] {
    -      data.reduceByKey(_ + _).count()
    -    }
    -
    -    // After the shuffle, there should be only 2 files on disk: the output of task 1
and its index.
    -    // All other files (map 2's output and intermediate merge files) should've been deleted.
    -    assert(diskBlockManager.getAllFiles().length === 2)
    -  }
    -
    -  test("no partial aggregation or sorting with kryo ser") {
    -    noPartialAggregationOrSorting(createSparkConf(false, true))
    -  }
    -
    -  test("no partial aggregation or sorting with java ser") {
    -    noPartialAggregationOrSorting(createSparkConf(false, false))
    -  }
    -
    -  def noPartialAggregationOrSorting(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)),
None, None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    -  }
    -
    -  test("partial aggregation without spill with kryo ser") {
    -    partialAggregationWithoutSpill(createSparkConf(false, true))
    -  }
    -
    -  test("partial aggregation without spill with java ser") {
    -    partialAggregationWithoutSpill(createSparkConf(false, false))
    -  }
    -
    -  def partialAggregationWithoutSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) =>
i + j)
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +    cleanupIntermediateFilesInShuffle(withFailures = false)
       }
     
    -  test("partial aggregation with spill, no ordering with kryo ser") {
    -    partialAggregationWIthSpillNoOrdering(createSparkConf(false, true))
    +  test("cleanup of intermediate files in shuffle with failures") {
    +    cleanupIntermediateFilesInShuffle(withFailures = true)
       }
     
    -  test("partial aggregation with spill, no ordering with java ser") {
    -    partialAggregationWIthSpillNoOrdering(createSparkConf(false, false))
    +  testWithMultipleSer("no sorting or partial aggregation") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling
= false)
       }
     
    -  def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) =>
i + j)
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +  testWithMultipleSer("no sorting or partial aggregation with spilling") { (conf: SparkConf)
=>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling
= true)
       }
     
    -  test("partial aggregation with spill, with ordering with kryo ser") {
    -    partialAggregationWithSpillWithOrdering(createSparkConf(false, true))
    +  testWithMultipleSer("sorting, no partial aggregation") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling =
false)
       }
     
    -
    -  test("partial aggregation with spill, with ordering with java ser") {
    -    partialAggregationWithSpillWithOrdering(createSparkConf(false, false))
    +  testWithMultipleSer("sorting, no partial aggregation with spilling") { (conf: SparkConf)
=>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling =
true)
       }
     
    -  def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) =>
i + j)
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord),
None)
    -
    -    // avoid combine before spill
    -    sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i)))
    -    sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +  testWithMultipleSer("partial aggregation, no sorting") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling =
false)
       }
     
    -  test("sorting without aggregation, no spill with kryo ser") {
    -    sortingWithoutAggregationNoSpill(createSparkConf(false, true))
    +  testWithMultipleSer("partial aggregation, no sorting with spilling") { (conf: SparkConf)
=>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling =
true)
       }
     
    -  test("sorting without aggregation, no spill with java ser") {
    -    sortingWithoutAggregationNoSpill(createSparkConf(false, false))
    +  testWithMultipleSer("partial aggregation and sorting") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling =
false)
       }
     
    -  def sortingWithoutAggregationNoSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 100).iterator.map(i => (i, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
    -    }).toSeq
    -    assert(results === expected)
    -  }
    -
    -  test("sorting without aggregation, with spill with kryo ser") {
    -    sortingWithoutAggregationWithSpill(createSparkConf(false, true))
    -  }
    -
    -  test("sorting without aggregation, with spill with java ser") {
    -    sortingWithoutAggregationWithSpill(createSparkConf(false, false))
    +  testWithMultipleSer("partial aggregation and sorting with spilling") { (conf: SparkConf)
=>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling =
true)
       }
     
    -  def sortingWithoutAggregationWithSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
    -    }).toSeq
    -    assert(results === expected)
    -  }
    +  testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)(
    +    sortWithoutBreakingSortingContracts)
     
       test("spilling with hash collisions") {
    -    val conf = createSparkConf(true, false)
    +    val size = 1000
    +    val conf = createSparkConf(loadDefaults = true, kryo = false)
    +    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
         sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     
         def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
         def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer
+= i
         def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String])
    -      : ArrayBuffer[String] = buffer1 ++= buffer2
    +    : ArrayBuffer[String] = buffer1 ++= buffer2
    --- End diff --
    
    oops I screwed that up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message