spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [1/2] spark git commit: [SPARK-4550] In sort-based shuffle, store map outputs in serialized form
Date Fri, 01 May 2015 06:12:03 GMT
Repository: spark
Updated Branches:
  refs/heads/master a9fc50552 -> 0a2b15ce4


http://git-wip-us.apache.org/repos/asf/spark/blob/0a2b15ce/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index de26aa3..20fd22b 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -19,19 +19,24 @@ package org.apache.spark.util.collection
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.scalatest.{PrivateMethodTester, FunSuite}
-
-import org.apache.spark._
+import org.scalatest.{FunSuite, PrivateMethodTester}
 
 import scala.util.Random
 
+import org.apache.spark._
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+
 class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMethodTester
{
-  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
+  private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
     val conf = new SparkConf(loadDefaults)
-    // Make the Java serializer write a reset instruction (TC_RESET) after each object to
test
-    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+    if (kryo) {
+      conf.set("spark.serializer", classOf[KryoSerializer].getName)
+    } else {
+      // Make the Java serializer write a reset instruction (TC_RESET) after each object
to test
+      // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
+      conf.set("spark.serializer.objectStreamReset", "1")
+      conf.set("spark.serializer", classOf[JavaSerializer].getName)
+    }
     // Ensure that we actually have multiple batches per spill file
     conf.set("spark.shuffle.spill.batchSize", "10")
     conf
@@ -47,8 +52,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(!sorter.invokePrivate(bypassMergeSort()), "sorter bypassed merge-sort")
   }
 
-  test("empty data stream") {
-    val conf = new SparkConf(false)
+  test("empty data stream with kryo ser") {
+    emptyDataStream(createSparkConf(false, true))
+  }
+
+  test("empty data stream with java ser") {
+    emptyDataStream(createSparkConf(false, false))
+  }
+
+  def emptyDataStream(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -81,8 +93,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     sorter4.stop()
   }
 
-  test("few elements per partition") {
-    val conf = createSparkConf(false)
+  test("few elements per partition with kryo ser") {
+    fewElementsPerPartition(createSparkConf(false, true))
+  }
+
+  test("few elements per partition with java ser") {
+    fewElementsPerPartition(createSparkConf(false, false))
+  }
+
+  def fewElementsPerPartition(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -123,8 +142,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     sorter4.stop()
   }
 
-  test("empty partitions with spilling") {
-    val conf = createSparkConf(false)
+  test("empty partitions with spilling with kryo ser") {
+    emptyPartitionsWithSpilling(createSparkConf(false, true))
+  }
+
+  test("empty partitions with spilling with java ser") {
+    emptyPartitionsWithSpilling(createSparkConf(false, false))
+  }
+
+  def emptyPartitionsWithSpilling(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
@@ -149,8 +175,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     sorter.stop()
   }
 
-  test("empty partitions with spilling, bypass merge-sort") {
-    val conf = createSparkConf(false)
+  test("empty partitions with spilling, bypass merge-sort with kryo ser") {
+    emptyPartitionerWithSpillingBypassMergeSort(createSparkConf(false, true))
+  }
+
+  test("empty partitions with spilling, bypass merge-sort with java ser") {
+    emptyPartitionerWithSpillingBypassMergeSort(createSparkConf(false, false))
+  }
+
+  def emptyPartitionerWithSpillingBypassMergeSort(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
@@ -174,8 +207,17 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     sorter.stop()
   }
 
-  test("spilling in local cluster") {
-    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME is not found
+  test("spilling in local cluster with kryo ser") {
+    // Load defaults, otherwise SPARK_HOME is not found
+    testSpillingInLocalCluster(createSparkConf(true, true))
+  }
+
+  test("spilling in local cluster with java ser") {
+    // Load defaults, otherwise SPARK_HOME is not found
+    testSpillingInLocalCluster(createSparkConf(true, false))
+  }
+
+  def testSpillingInLocalCluster(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
@@ -245,8 +287,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
   }
 
-  test("spilling in local cluster with many reduce tasks") {
-    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME is not found
+  test("spilling in local cluster with many reduce tasks with kryo ser") {
+    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, true))
+  }
+
+  test("spilling in local cluster with many reduce tasks with java ser") {
+    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, false))
+  }
+
+  def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
@@ -317,7 +366,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("cleanup of intermediate files in sorter") {
-    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME is not found
+    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not
found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -344,7 +393,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("cleanup of intermediate files in sorter, bypass merge-sort") {
-    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME is not found
+    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not
found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -367,7 +416,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("cleanup of intermediate files in sorter if there are errors") {
-    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME is not found
+    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not
found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -392,7 +441,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("cleanup of intermediate files in sorter if there are errors, bypass merge-sort")
{
-    val conf = createSparkConf(true)  // Load defaults, otherwise SPARK_HOME is not found
+    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not
found
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -414,7 +463,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("cleanup of intermediate files in shuffle") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(false, false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -429,7 +478,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("cleanup of intermediate files in shuffle with errors") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(false, false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -450,8 +499,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(diskBlockManager.getAllFiles().length === 2)
   }
 
-  test("no partial aggregation or sorting") {
-    val conf = createSparkConf(false)
+  test("no partial aggregation or sorting with kryo ser") {
+    noPartialAggregationOrSorting(createSparkConf(false, true))
+  }
+
+  test("no partial aggregation or sorting with java ser") {
+    noPartialAggregationOrSorting(createSparkConf(false, false))
+  }
+
+  def noPartialAggregationOrSorting(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -465,8 +521,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(results === expected)
   }
 
-  test("partial aggregation without spill") {
-    val conf = createSparkConf(false)
+  test("partial aggregation without spill with kryo ser") {
+    partialAggregationWithoutSpill(createSparkConf(false, true))
+  }
+
+  test("partial aggregation without spill with java ser") {
+    partialAggregationWithoutSpill(createSparkConf(false, false))
+  }
+
+  def partialAggregationWithoutSpill(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -481,8 +544,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(results === expected)
   }
 
-  test("partial aggregation with spill, no ordering") {
-    val conf = createSparkConf(false)
+  test("partial aggregation with spill, no ordering with kryo ser") {
+    partialAggregationWIthSpillNoOrdering(createSparkConf(false, true))
+  }
+
+  test("partial aggregation with spill, no ordering with java ser") {
+    partialAggregationWIthSpillNoOrdering(createSparkConf(false, false))
+  }
+
+  def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -497,8 +567,16 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(results === expected)
   }
 
-  test("partial aggregation with spill, with ordering") {
-    val conf = createSparkConf(false)
+  test("partial aggregation with spill, with ordering with kryo ser") {
+    partialAggregationWithSpillWithOrdering(createSparkConf(false, true))
+  }
+
+
+  test("partial aggregation with spill, with ordering with java ser") {
+    partialAggregationWithSpillWithOrdering(createSparkConf(false, false))
+  }
+
+  def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -517,8 +595,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(results === expected)
   }
 
-  test("sorting without aggregation, no spill") {
-    val conf = createSparkConf(false)
+  test("sorting without aggregation, no spill with kryo ser") {
+    sortingWithoutAggregationNoSpill(createSparkConf(false, true))
+  }
+
+  test("sorting without aggregation, no spill with java ser") {
+    sortingWithoutAggregationNoSpill(createSparkConf(false, false))
+  }
+
+  def sortingWithoutAggregationNoSpill(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -534,8 +619,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assert(results === expected)
   }
 
-  test("sorting without aggregation, with spill") {
-    val conf = createSparkConf(false)
+  test("sorting without aggregation, with spill with kryo ser") {
+    sortingWithoutAggregationWithSpill(createSparkConf(false, true))
+  }
+
+  test("sorting without aggregation, with spill with java ser") {
+    sortingWithoutAggregationWithSpill(createSparkConf(false, false))
+  }
+
+  def sortingWithoutAggregationWithSpill(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -552,7 +644,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("spilling with hash collisions") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(true, false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -609,7 +701,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("spilling with many hash collisions") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(true, false)
     conf.set("spark.shuffle.memoryFraction", "0.0001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -632,7 +724,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("spilling with hash collisions using the Int.MaxValue key") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(true, false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -656,7 +748,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("spilling with null keys and values") {
-    val conf = createSparkConf(true)
+    val conf = createSparkConf(true, false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
 
@@ -685,7 +777,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
   }
 
   test("conditions for bypassing merge-sort") {
-    val conf = createSparkConf(false)
+    val conf = createSparkConf(false, false)
     conf.set("spark.shuffle.memoryFraction", "0.001")
     conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
     sc = new SparkContext("local", "test", conf)
@@ -718,8 +810,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with
PrivateMe
     assertDidNotBypassMergeSort(sorter4)
   }
 
-  test("sort without breaking sorting contracts") {
-    val conf = createSparkConf(true)
+  test("sort without breaking sorting contracts with kryo ser") {
+    sortWithoutBreakingSortingContracts(createSparkConf(true, true))
+  }
+
+  test("sort without breaking sorting contracts with java ser") {
+    sortWithoutBreakingSortingContracts(createSparkConf(true, false))
+  }
+
+  def sortWithoutBreakingSortingContracts(conf: SparkConf) {
     conf.set("spark.shuffle.memoryFraction", "0.01")
     conf.set("spark.shuffle.manager", "sort")
     sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a2b15ce/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
new file mode 100644
index 0000000..b5a2d9e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/PartitionedSerializedPairBufferSuite.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
+
+import com.google.common.io.ByteStreams
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.storage.{FileSegment, BlockObjectWriter}
+
+class PartitionedSerializedPairBufferSuite extends FunSuite {
+  test("OrderedInputStream single record") {
+    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
+
+    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance)
+    val struct = SomeStruct("something", 5)
+    buffer.insert(4, 10, struct)
+
+    val bytes = ByteStreams.toByteArray(buffer.orderedInputStream)
+
+    val baos = new ByteArrayOutputStream()
+    val stream = serializerInstance.serializeStream(baos)
+    stream.writeObject(10)
+    stream.writeObject(struct)
+    stream.close()
+
+    baos.toByteArray should be (bytes)
+  }
+
+  test("insert single record") {
+    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
+    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance)
+    val struct = SomeStruct("something", 5)
+    buffer.insert(4, 10, struct)
+    val elements = buffer.partitionedDestructiveSortedIterator(None).toArray
+    elements.size should be (1)
+    elements.head should be (((4, 10), struct))
+  }
+
+  test("insert multiple records") {
+    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
+    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance)
+    val struct1 = SomeStruct("something1", 8)
+    buffer.insert(6, 1, struct1)
+    val struct2 = SomeStruct("something2", 9)
+    buffer.insert(4, 2, struct2)
+    val struct3 = SomeStruct("something3", 10)
+    buffer.insert(5, 3, struct3)
+
+    val elements = buffer.partitionedDestructiveSortedIterator(None).toArray
+    elements.size should be (3)
+    elements(0) should be (((4, 2), struct2))
+    elements(1) should be (((5, 3), struct3))
+    elements(2) should be (((6, 1), struct1))
+  }
+
+  test("write single record") {
+    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
+    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance)
+    val struct = SomeStruct("something", 5)
+    buffer.insert(4, 10, struct)
+    val it = buffer.destructiveSortedWritablePartitionedIterator(None)
+    val writer = new SimpleBlockObjectWriter
+    assert(it.hasNext)
+    it.nextPartition should be (4)
+    it.writeNext(writer)
+    assert(!it.hasNext)
+
+    val stream = serializerInstance.deserializeStream(writer.getInputStream)
+    stream.readObject[AnyRef]() should be (10)
+    stream.readObject[AnyRef]() should be (struct)
+  }
+
+  test("write multiple records") {
+    val serializerInstance = new KryoSerializer(new SparkConf()).newInstance
+    val buffer = new PartitionedSerializedPairBuffer[Int, SomeStruct](4, 32, serializerInstance)
+    val struct1 = SomeStruct("something1", 8)
+    buffer.insert(6, 1, struct1)
+    val struct2 = SomeStruct("something2", 9)
+    buffer.insert(4, 2, struct2)
+    val struct3 = SomeStruct("something3", 10)
+    buffer.insert(5, 3, struct3)
+
+    val it = buffer.destructiveSortedWritablePartitionedIterator(None)
+    val writer = new SimpleBlockObjectWriter
+    assert(it.hasNext)
+    it.nextPartition should be (4)
+    it.writeNext(writer)
+    assert(it.hasNext)
+    it.nextPartition should be (5)
+    it.writeNext(writer)
+    assert(it.hasNext)
+    it.nextPartition should be (6)
+    it.writeNext(writer)
+    assert(!it.hasNext)
+
+    val stream = serializerInstance.deserializeStream(writer.getInputStream)
+    val iter = stream.asIterator
+    iter.next() should be (2)
+    iter.next() should be (struct2)
+    iter.next() should be (3)
+    iter.next() should be (struct3)
+    iter.next() should be (1)
+    iter.next() should be (struct1)
+    assert(!iter.hasNext)
+  }
+}
+
+case class SomeStruct(val str: String, val num: Int)
+
+class SimpleBlockObjectWriter extends BlockObjectWriter(null) {
+  val baos = new ByteArrayOutputStream()
+
+  override def write(bytes: Array[Byte], offs: Int, len: Int): Unit = {
+    baos.write(bytes, offs, len)
+  }
+
+  def getInputStream(): InputStream = new ByteArrayInputStream(baos.toByteArray)
+
+  override def open(): BlockObjectWriter = this
+  override def close(): Unit = { }
+  override def isOpen: Boolean = true
+  override def commitAndClose(): Unit = { }
+  override def revertPartialWritesAndClose(): Unit = { }
+  override def fileSegment(): FileSegment = null
+  override def write(key: Any, value: Any): Unit = { }
+  override def recordWritten(): Unit = { }
+  override def write(b: Int): Unit = { }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a2b15ce/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index cec97de..9552f41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -50,10 +50,10 @@ private[sql] class Serializer2SerializationStream(
   extends SerializationStream with Logging {
 
   val rowOut = new DataOutputStream(out)
-  val writeKey = SparkSqlSerializer2.createSerializationFunction(keySchema, rowOut)
-  val writeValue = SparkSqlSerializer2.createSerializationFunction(valueSchema, rowOut)
+  val writeKeyFunc = SparkSqlSerializer2.createSerializationFunction(keySchema, rowOut)
+  val writeValueFunc = SparkSqlSerializer2.createSerializationFunction(valueSchema, rowOut)
 
-  def writeObject[T: ClassTag](t: T): SerializationStream = {
+  override def writeObject[T: ClassTag](t: T): SerializationStream = {
     val kv = t.asInstanceOf[Product2[Row, Row]]
     writeKey(kv._1)
     writeValue(kv._2)
@@ -61,6 +61,16 @@ private[sql] class Serializer2SerializationStream(
     this
   }
 
+  override def writeKey[T: ClassTag](t: T): SerializationStream = {
+    writeKeyFunc(t.asInstanceOf[Row])
+    this
+  }
+
+  override def writeValue[T: ClassTag](t: T): SerializationStream = {
+    writeValueFunc(t.asInstanceOf[Row])
+    this
+  }
+
   def flush(): Unit = {
     rowOut.flush()
   }
@@ -83,17 +93,27 @@ private[sql] class Serializer2DeserializationStream(
 
   val key = if (keySchema != null) new SpecificMutableRow(keySchema) else null
   val value = if (valueSchema != null) new SpecificMutableRow(valueSchema) else null
-  val readKey = SparkSqlSerializer2.createDeserializationFunction(keySchema, rowIn, key)
-  val readValue = SparkSqlSerializer2.createDeserializationFunction(valueSchema, rowIn, value)
+  val readKeyFunc = SparkSqlSerializer2.createDeserializationFunction(keySchema, rowIn, key)
+  val readValueFunc = SparkSqlSerializer2.createDeserializationFunction(valueSchema, rowIn,
value)
 
-  def readObject[T: ClassTag](): T = {
-    readKey()
-    readValue()
+  override def readObject[T: ClassTag](): T = {
+    readKeyFunc()
+    readValueFunc()
 
     (key, value).asInstanceOf[T]
   }
 
-  def close(): Unit = {
+  override def readKey[T: ClassTag](): T = {
+    readKeyFunc()
+    key.asInstanceOf[T]
+  }
+
+  override def readValue[T: ClassTag](): T = {
+    readValueFunc()
+    value.asInstanceOf[T]
+  }
+
+  override def close(): Unit = {
     rowIn.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a2b15ce/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index f2d1353..baa9761 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -46,7 +46,8 @@ object StoragePerfTester {
     val totalRecords = dataSizeMb * 1000
     val recordsPerMap = totalRecords / numMaps
 
-    val writeData = "1" * recordLength
+    val writeKey = "1" * (recordLength / 2)
+    val writeValue = "1" * (recordLength / 2)
     val executor = Executors.newFixedThreadPool(numMaps)
 
     val conf = new SparkConf()
@@ -63,7 +64,7 @@ object StoragePerfTester {
         new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
       val writers = shuffle.writers
       for (i <- 1 to recordsPerMap) {
-        writers(i % numOutputSplits).write(writeData)
+        writers(i % numOutputSplits).write(writeKey, writeValue)
       }
       writers.map { w =>
         w.commitAndClose()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message