spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-12390] Clean up unused serializer parameter in BlockManager
Date Thu, 17 Dec 2015 20:03:00 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 881f2544e -> 88bbb5429


[SPARK-12390] Clean up unused serializer parameter in BlockManager

No change in functionality is intended. This only changes internal API.

Author: Andrew Or <andrew@databricks.com>

Closes #10343 from andrewor14/clean-bm-serializer.

Conflicts:
	core/src/main/scala/org/apache/spark/storage/BlockManager.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bbb542
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bbb542
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bbb542

Branch: refs/heads/branch-1.6
Commit: 88bbb5429dd3efcff6b2835a70143247b08ae6b2
Parents: 881f254
Author: Andrew Or <andrew@databricks.com>
Authored: Wed Dec 16 20:01:47 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu Dec 17 12:01:13 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 29 ++++++++------------
 .../org/apache/spark/storage/DiskStore.scala    | 10 -------
 2 files changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ab0007f..2cc2fd9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1190,20 +1190,16 @@ private[spark] class BlockManager(
   def dataSerializeStream(
       blockId: BlockId,
       outputStream: OutputStream,
-      values: Iterator[Any],
-      serializer: Serializer = defaultSerializer): Unit = {
+      values: Iterator[Any]): Unit = {
     val byteStream = new BufferedOutputStream(outputStream)
-    val ser = serializer.newInstance()
+    val ser = defaultSerializer.newInstance()
     ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
   }
 
   /** Serializes into a byte buffer. */
-  def dataSerialize(
-      blockId: BlockId,
-      values: Iterator[Any],
-      serializer: Serializer = defaultSerializer): ByteBuffer = {
+  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
     val byteStream = new ByteArrayOutputStream(4096)
-    dataSerializeStream(blockId, byteStream, values, serializer)
+    dataSerializeStream(blockId, byteStream, values)
     ByteBuffer.wrap(byteStream.toByteArray)
   }
 
@@ -1211,24 +1207,21 @@ private[spark] class BlockManager(
    * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end
of
    * the iterator is reached.
    */
-  def dataDeserialize(
-      blockId: BlockId,
-      bytes: ByteBuffer,
-      serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
     bytes.rewind()
-    dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
+    dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
   }
 
   /**
    * Deserializes a InputStream into an iterator of values and disposes of it when the end
of
    * the iterator is reached.
    */
-  def dataDeserializeStream(
-      blockId: BlockId,
-      inputStream: InputStream,
-      serializer: Serializer = defaultSerializer): Iterator[Any] = {
+  def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] =
{
     val stream = new BufferedInputStream(inputStream)
-    serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
+    defaultSerializer
+      .newInstance()
+      .deserializeStream(wrapForCompression(blockId, stream))
+      .asIterator
   }
 
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/88bbb542/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c008b9d..6c44771 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager:
DiskBloc
     getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
   }
 
-  /**
-   * A version of getValues that allows a custom serializer. This is used as part of the
-   * shuffle short-circuit code.
-   */
-  def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
-    // TODO: Should bypass getBytes and use a stream based implementation, so that
-    // we won't use a lot of memory during e.g. external sort merge.
-    getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
-  }
-
   override def remove(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     if (file.exists()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message