spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [5/6] git commit: Address Matei's comments
Date Mon, 14 Oct 2013 21:20:12 GMT
Address Matei's comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4a45019f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4a45019f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4a45019f

Branch: refs/heads/master
Commit: 4a45019fb0458b5f943253c0c16c9e257ef2c129
Parents: da89611
Author: Aaron Davidson <aaron@databricks.com>
Authored: Mon Oct 14 00:24:17 2013 -0700
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Mon Oct 14 00:24:17 2013 -0700

----------------------------------------------------------------------
 .../spark/network/netty/FileServerHandler.java  |  2 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  4 +--
 .../spark/network/netty/ShuffleCopier.scala     |  2 +-
 .../spark/network/netty/ShuffleSender.scala     |  2 +-
 .../org/apache/spark/storage/BlockId.scala      | 35 +++++++++++---------
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../org/apache/spark/storage/DiskStore.scala    |  2 +-
 .../org/apache/spark/storage/BlockIdSuite.scala | 13 ++------
 8 files changed, 28 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index 097b2ad..cfd8132 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -37,7 +37,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String>
{
   @Override
   public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
     BlockId blockId = BlockId.apply(blockIdString);
-    String path = pResolver.getAbsolutePath(blockId.asFilename());
+    String path = pResolver.getAbsolutePath(blockId.name());
     // if getFilePath returns null, close the channel
     if (path == null) {
       //ctx.close();

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 919f976..609464e 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -120,7 +120,7 @@ private object HttpBroadcast extends Logging {
   }
 
   def write(id: Long, value: Any) {
-    val file = new File(broadcastDir, BroadcastBlockId(id).asFilename)
+    val file = new File(broadcastDir, BroadcastBlockId(id).name)
     val out: OutputStream = {
       if (compress) {
         compressionCodec.compressedOutputStream(new FileOutputStream(file))
@@ -136,7 +136,7 @@ private object HttpBroadcast extends Logging {
   }
 
   def read[T](id: Long): T = {
-    val url = serverUri + "/" + BroadcastBlockId(id).asFilename
+    val url = serverUri + "/" + BroadcastBlockId(id).name
     val in = {
       if (compress) {
         compressionCodec.compressedInputStream(new URL(url).openStream())

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index eac2177..481ff8c 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -42,7 +42,7 @@ private[spark] class ShuffleCopier extends Logging {
     try {
       fc.init()
       fc.connect(host, port)
-      fc.sendRequest(blockId.asFilename)
+      fc.sendRequest(blockId.name)
       fc.waitForClose()
       fc.close()
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index b88fbaa..1586dff 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
         val dirId = hash % localDirs.length
         val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
         val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
-        val file = new File(subDir, blockId.asFilename)
+        val file = new File(subDir, blockId.name)
         return file.getAbsolutePath
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index b477a82..c7efc67 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -24,13 +24,10 @@ package org.apache.spark.storage
  *
  * If your BlockId should be serializable, be sure to add it to the BlockId.fromString()
method.
  */
-private[spark] abstract class BlockId {
+private[spark] sealed abstract class BlockId {
   /** A globally unique identifier for this Block. Can be used for ser/de. */
   def name: String
 
-  /** Physical filename for this block. May not be valid for Blocks are not file-backed.
*/
-  def asFilename = name
-
   // convenience methods
   def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
   def isRDD = isInstanceOf[RDDBlockId]
@@ -73,21 +70,27 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
 
 private[spark] object BlockId {
   val RDD = "rdd_([0-9]+)_([0-9]+)".r
-  val Shuffle = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
-  val Broadcast = "broadcast_([0-9]+)".r
-  val TaskResult = "taskresult_([0-9]+)".r
-  val StreamInput = "input-([0-9]+)-([0-9]+)".r
-  val Test = "test_(.*)".r
+  val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
+  val BROADCAST = "broadcast_([0-9]+)".r
+  val TASKRESULT = "taskresult_([0-9]+)".r
+  val STREAM = "input-([0-9]+)-([0-9]+)".r
+  val TEST = "test_(.*)".r
 
   /** Converts a BlockId "name" String back into a BlockId. */
   def apply(id: String) = id match {
-    case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt)
-    case Shuffle(shuffleId, mapId, reduceId) =>
+    case RDD(rddId, splitIndex) =>
+      RDDBlockId(rddId.toInt, splitIndex.toInt)
+    case SHUFFLE(shuffleId, mapId, reduceId) =>
       ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
-    case Broadcast(broadcastId) => BroadcastBlockId(broadcastId.toLong)
-    case TaskResult(taskId) => TaskResultBlockId(taskId.toLong)
-    case StreamInput(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong)
-    case Test(value) => TestBlockId(value)
-    case _ => throw new IllegalStateException("Unrecognized BlockId: " + id)
+    case BROADCAST(broadcastId) =>
+      BroadcastBlockId(broadcastId.toLong)
+    case TASKRESULT(taskId) =>
+      TaskResultBlockId(taskId.toLong)
+    case STREAM(streamId, uniqueId) =>
+      StreamBlockId(streamId.toInt, uniqueId.toLong)
+    case TEST(value) =>
+      TestBlockId(value)
+    case _ =>
+      throw new IllegalStateException("Unrecognized BlockId: " + id)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4ca86b7..801f88a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -970,7 +970,7 @@ private[spark] class BlockManager(
     case ShuffleBlockId(_, _, _) => compressShuffle
     case BroadcastBlockId(_) => compressBroadcast
     case RDDBlockId(_, _) => compressRdds
-    case _ => false // Won't happen in a real cluster, but it can in tests
+    case _ => false
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index c0b0076..b7ca61e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -258,7 +258,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
       }
     }
 
-    new File(subDir, blockId.asFilename)
+    new File(subDir, blockId.name)
   }
 
   private def createLocalDirs(): Array[File] = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a45019f/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index 27f0dce..cb76275 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -22,29 +22,20 @@ import org.scalatest.FunSuite
 class BlockIdSuite extends FunSuite {
   def assertSame(id1: BlockId, id2: BlockId) {
     assert(id1.name === id2.name)
-    assert(id1.asFilename === id2.asFilename)
     assert(id1.hashCode === id2.hashCode)
     assert(id1 === id2)
   }
 
   def assertDifferent(id1: BlockId, id2: BlockId) {
     assert(id1.name != id2.name)
-    assert(id1.asFilename != id2.asFilename)
     assert(id1.hashCode != id2.hashCode)
     assert(id1 != id2)
   }
 
-  test("basic-functions") {
-    case class MyBlockId(name: String) extends BlockId
-
-    val id = MyBlockId("a")
-    assertSame(id, MyBlockId("a"))
-    assertDifferent(id, MyBlockId("b"))
-    assert(id.asRDDId === None)
-
+  test("test-bad-deserialization") {
     try {
       // Try to deserialize an invalid block id.
-      BlockId("a")
+      BlockId("myblock")
       fail()
     } catch {
       case e: IllegalStateException => // OK


Mime
View raw message