spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [3/3] git commit: Revert "[SPARK-2468] Netty based block server / client module"
Date Fri, 15 Aug 2014 16:01:14 GMT
Revert "[SPARK-2468] Netty based block server / client module"

This reverts commit 3a8b68b7353fea50245686903b308fa9eb52cb51.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd9fcd25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd9fcd25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd9fcd25

Branch: refs/heads/master
Commit: fd9fcd25e93c727b327909cde0027426204ca6c3
Parents: 7589c39
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Fri Aug 15 09:01:04 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Fri Aug 15 09:01:04 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/network/netty/FileClient.scala |   85 ++
 .../netty/FileClientChannelInitializer.scala    |   31 +
 .../spark/network/netty/FileClientHandler.scala |   50 +
 .../apache/spark/network/netty/FileHeader.scala |   71 +
 .../apache/spark/network/netty/FileServer.scala |   91 ++
 .../netty/FileServerChannelInitializer.scala    |   34 +
 .../spark/network/netty/FileServerHandler.scala |   68 +
 .../spark/network/netty/NettyConfig.scala       |   59 -
 .../spark/network/netty/ShuffleCopier.scala     |  118 ++
 .../spark/network/netty/ShuffleSender.scala     |   71 +
 .../netty/client/BlockFetchingClient.scala      |  135 --
 .../client/BlockFetchingClientFactory.scala     |   99 --
 .../client/BlockFetchingClientHandler.scala     |   63 -
 .../network/netty/client/LazyInitIterator.scala |   44 -
 .../netty/client/ReferenceCountedBuffer.scala   |   47 -
 .../network/netty/server/BlockHeader.scala      |   32 -
 .../netty/server/BlockHeaderEncoder.scala       |   47 -
 .../network/netty/server/BlockServer.scala      |  162 --
 .../server/BlockServerChannelInitializer.scala  |   40 -
 .../netty/server/BlockServerHandler.scala       |  140 --
 .../spark/storage/BlockDataProvider.scala       |   32 -
 .../spark/storage/BlockFetcherIterator.scala    |  138 +-
 .../org/apache/spark/storage/BlockManager.scala |   49 +-
 .../spark/storage/BlockNotFoundException.scala  |   21 -
 .../apache/spark/storage/DiskBlockManager.scala |   13 +-
 core/src/test/resources/netty-test-file.txt     | 1379 ------------------
 .../netty/ServerClientIntegrationSuite.scala    |  158 --
 .../BlockFetchingClientHandlerSuite.scala       |   87 --
 .../netty/server/BlockHeaderEncoderSuite.scala  |   64 -
 .../netty/server/BlockServerHandlerSuite.scala  |  101 --
 pom.xml                                         |    2 +-
 31 files changed, 714 insertions(+), 2817 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
new file mode 100644
index 0000000..c6d35f7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.util.concurrent.TimeUnit
+
+import io.netty.bootstrap.Bootstrap
+import io.netty.channel.{Channel, ChannelOption, EventLoopGroup}
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.oio.OioSocketChannel
+
+import org.apache.spark.Logging
+
+class FileClient(handler: FileClientHandler, connectTimeout: Int) extends Logging {
+
+  private var channel: Channel = _
+  private var bootstrap: Bootstrap = _
+  private var group: EventLoopGroup = _
+  private val sendTimeout = 60
+
+  def init(): Unit = {
+    group = new OioEventLoopGroup
+    bootstrap = new Bootstrap
+    bootstrap.group(group)
+      .channel(classOf[OioSocketChannel])
+      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
+      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(connectTimeout))
+      .handler(new FileClientChannelInitializer(handler))
+  }
+
+  def connect(host: String, port: Int) {
+    try {
+      channel = bootstrap.connect(host, port).sync().channel()
+    } catch {
+      case e: InterruptedException =>
+        logWarning("FileClient interrupted while trying to connect", e)
+        close()
+    }
+  }
+
+  def waitForClose(): Unit = {
+    try {
+      channel.closeFuture.sync()
+    } catch {
+      case e: InterruptedException =>
+        logWarning("FileClient interrupted", e)
+    }
+  }
+
+  def sendRequest(file: String): Unit = {
+    try {
+      val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS)
+      if (!bSent) {
+        throw new RuntimeException("Failed to send")
+      }
+    } catch {
+      case e: InterruptedException =>
+        logError("Error", e)
+    }
+  }
+
+  def close(): Unit = {
+    if (group != null) {
+      group.shutdownGracefully()
+      group = null
+      bootstrap = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
new file mode 100644
index 0000000..f4261c1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.channel.ChannelInitializer
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.codec.string.StringEncoder
+
+
+class FileClientChannelInitializer(handler: FileClientHandler)
+  extends ChannelInitializer[SocketChannel] {
+
+  def initChannel(channel: SocketChannel) {
+    channel.pipeline.addLast("encoder", new StringEncoder).addLast("handler", handler)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
new file mode 100644
index 0000000..017302e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
+
+import org.apache.spark.storage.BlockId
+
+
+abstract class FileClientHandler extends SimpleChannelInboundHandler[ByteBuf] {
+
+  private var currentHeader: FileHeader = null
+
+  @volatile
+  private var handlerCalled: Boolean = false
+
+  def isComplete: Boolean = handlerCalled
+
+  def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader)
+
+  def handleError(blockId: BlockId)
+
+  override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
+    if (currentHeader == null && in.readableBytes >= FileHeader.HEADER_SIZE) {
+      currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE))
+    }
+    if (in.readableBytes >= currentHeader.fileLen) {
+      handle(ctx, in, currentHeader)
+      handlerCalled = true
+      currentHeader = null
+      ctx.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
new file mode 100644
index 0000000..607e560
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.buffer._
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.{BlockId, TestBlockId}
+
+private[spark] class FileHeader (
+  val fileLen: Int,
+  val blockId: BlockId) extends Logging {
+
+  lazy val buffer: ByteBuf = {
+    val buf = Unpooled.buffer()
+    buf.capacity(FileHeader.HEADER_SIZE)
+    buf.writeInt(fileLen)
+    buf.writeInt(blockId.name.length)
+    blockId.name.foreach((x: Char) => buf.writeByte(x))
+    // padding the rest of header
+    if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
+      buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
+    } else {
+      throw new Exception("too long header " + buf.readableBytes)
+      logInfo("too long header")
+    }
+    buf
+  }
+
+}
+
+private[spark] object FileHeader {
+
+  val HEADER_SIZE = 40
+
+  def getFileLenOffset = 0
+  def getFileLenSize = Integer.SIZE/8
+
+  def create(buf: ByteBuf): FileHeader = {
+    val length = buf.readInt
+    val idLength = buf.readInt
+    val idBuilder = new StringBuilder(idLength)
+    for (i <- 1 to idLength) {
+      idBuilder += buf.readByte().asInstanceOf[Char]
+    }
+    val blockId = BlockId(idBuilder.toString())
+    new FileHeader(length, blockId)
+  }
+
+  def main(args:Array[String]) {
+    val header = new FileHeader(25, TestBlockId("my_block"))
+    val buf = header.buffer
+    val newHeader = FileHeader.create(buf)
+    System.out.println("id=" + newHeader.blockId + ",size=" + newHeader.fileLen)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
new file mode 100644
index 0000000..dff7795
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.net.InetSocketAddress
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.channel.{ChannelFuture, ChannelOption, EventLoopGroup}
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.oio.OioServerSocketChannel
+
+import org.apache.spark.Logging
+
+/**
+ * Server that accept the path of a file an echo back its content.
+ */
+class FileServer(pResolver: PathResolver, private var port: Int) extends Logging {
+
+  private val addr: InetSocketAddress = new InetSocketAddress(port)
+  private var bossGroup: EventLoopGroup = new OioEventLoopGroup
+  private var workerGroup: EventLoopGroup = new OioEventLoopGroup
+
+  private var channelFuture: ChannelFuture = {
+    val bootstrap = new ServerBootstrap
+    bootstrap.group(bossGroup, workerGroup)
+      .channel(classOf[OioServerSocketChannel])
+      .option(ChannelOption.SO_BACKLOG, java.lang.Integer.valueOf(100))
+      .option(ChannelOption.SO_RCVBUF, java.lang.Integer.valueOf(1500))
+      .childHandler(new FileServerChannelInitializer(pResolver))
+    bootstrap.bind(addr)
+  }
+
+  try {
+    val boundAddress = channelFuture.sync.channel.localAddress.asInstanceOf[InetSocketAddress]
+    port = boundAddress.getPort
+  } catch {
+    case ie: InterruptedException =>
+      port = 0
+  }
+
+  /** Start the file server asynchronously in a new thread. */
+  def start(): Unit = {
+    val blockingThread: Thread = new Thread {
+      override def run(): Unit = {
+        try {
+          channelFuture.channel.closeFuture.sync
+          logInfo("FileServer exiting")
+        } catch {
+          case e: InterruptedException =>
+            logError("File server start got interrupted", e)
+        }
+        // NOTE: bootstrap is shutdown in stop()
+      }
+    }
+    blockingThread.setDaemon(true)
+    blockingThread.start()
+  }
+
+  def getPort: Int = port
+
+  def stop(): Unit = {
+    if (channelFuture != null) {
+      channelFuture.channel().close().awaitUninterruptibly()
+      channelFuture = null
+    }
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully()
+      bossGroup = null
+    }
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully()
+      workerGroup = null
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
new file mode 100644
index 0000000..aaa2f91
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import io.netty.channel.ChannelInitializer
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
+import io.netty.handler.codec.string.StringDecoder
+
+class FileServerChannelInitializer(pResolver: PathResolver)
+  extends ChannelInitializer[SocketChannel] {
+
+  override def initChannel(channel: SocketChannel): Unit = {
+    channel.pipeline
+      .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter : _*))
+      .addLast("stringDecoder", new StringDecoder)
+      .addLast("handler", new FileServerHandler(pResolver))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
new file mode 100644
index 0000000..96f60b2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.io.FileInputStream
+
+import io.netty.channel.{DefaultFileRegion, ChannelHandlerContext, SimpleChannelInboundHandler}
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.{BlockId, FileSegment}
+
+
+class FileServerHandler(pResolver: PathResolver)
+  extends SimpleChannelInboundHandler[String] with Logging {
+
+  override def channelRead0(ctx: ChannelHandlerContext, blockIdString: String): Unit = {
+    val blockId: BlockId = BlockId(blockIdString)
+    val fileSegment: FileSegment = pResolver.getBlockLocation(blockId)
+    if (fileSegment == null) {
+      return
+    }
+    val file = fileSegment.file
+    if (file.exists) {
+      if (!file.isFile) {
+        ctx.write(new FileHeader(0, blockId).buffer)
+        ctx.flush()
+        return
+      }
+      val length: Long = fileSegment.length
+      if (length > Integer.MAX_VALUE || length <= 0) {
+        ctx.write(new FileHeader(0, blockId).buffer)
+        ctx.flush()
+        return
+      }
+      ctx.write(new FileHeader(length.toInt, blockId).buffer)
+      try {
+        val channel = new FileInputStream(file).getChannel
+        ctx.write(new DefaultFileRegion(channel, fileSegment.offset, fileSegment.length))
+      } catch {
+        case e: Exception =>
+          logError("Exception: ", e)
+      }
+    } else {
+      ctx.write(new FileHeader(0, blockId).buffer)
+    }
+    ctx.flush()
+  }
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
+    logError("Exception: ", cause)
+    ctx.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
deleted file mode 100644
index b587015..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import org.apache.spark.SparkConf
-
-/**
- * A central location that tracks all the settings we exposed to users.
- */
-private[spark]
-class NettyConfig(conf: SparkConf) {
-
-  /** Port the server listens on. Default to a random port. */
-  private[netty] val serverPort = conf.getInt("spark.shuffle.io.port", 0)
-
-  /** IO mode: nio, oio, epoll, or auto (try epoll first and then nio). */
-  private[netty] val ioMode = conf.get("spark.shuffle.io.mode", "nio").toLowerCase
-
-  /** Connect timeout in secs. Default 60 secs. */
-  private[netty] val connectTimeoutMs = conf.getInt("spark.shuffle.io.connectionTimeout", 60) * 1000
-
-  /**
-   * Percentage of the desired amount of time spent for I/O in the child event loops.
-   * Only applicable in nio and epoll.
-   */
-  private[netty] val ioRatio = conf.getInt("spark.shuffle.io.netty.ioRatio", 80)
-
-  /** Requested maximum length of the queue of incoming connections. */
-  private[netty] val backLog: Option[Int] = conf.getOption("spark.shuffle.io.backLog").map(_.toInt)
-
-  /**
-   * Receive buffer size (SO_RCVBUF).
-   * Note: the optimal size for receive buffer and send buffer should be
-   *  latency * network_bandwidth.
-   * Assuming latency = 1ms, network_bandwidth = 10Gbps
-   *  buffer size should be ~ 1.25MB
-   */
-  private[netty] val receiveBuf: Option[Int] =
-    conf.getOption("spark.shuffle.io.sendBuffer").map(_.toInt)
-
-  /** Send buffer size (SO_SNDBUF). */
-  private[netty] val sendBuf: Option[Int] =
-    conf.getOption("spark.shuffle.io.sendBuffer").map(_.toInt)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
new file mode 100644
index 0000000..e7b2855
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.util.concurrent.Executors
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.ChannelHandlerContext
+import io.netty.util.CharsetUtil
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.network.ConnectionManagerId
+import org.apache.spark.storage.BlockId
+
+private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
+
+  def getBlock(host: String, port: Int, blockId: BlockId,
+      resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
+
+    val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
+    val connectTimeout = conf.getInt("spark.shuffle.netty.connect.timeout", 60000)
+    val fc = new FileClient(handler, connectTimeout)
+
+    try {
+      fc.init()
+      fc.connect(host, port)
+      fc.sendRequest(blockId.name)
+      fc.waitForClose()
+      fc.close()
+    } catch {
+      // Handle any socket-related exceptions in FileClient
+      case e: Exception => {
+        logError("Shuffle copy of block " + blockId + " from " + host + ":" + port + " failed", e)
+        handler.handleError(blockId)
+      }
+    }
+  }
+
+  def getBlock(cmId: ConnectionManagerId, blockId: BlockId,
+      resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
+    getBlock(cmId.host, cmId.port, blockId, resultCollectCallback)
+  }
+
+  def getBlocks(cmId: ConnectionManagerId,
+    blocks: Seq[(BlockId, Long)],
+    resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
+
+    for ((blockId, size) <- blocks) {
+      getBlock(cmId, blockId, resultCollectCallback)
+    }
+  }
+}
+
+
+private[spark] object ShuffleCopier extends Logging {
+
+  private class ShuffleClientHandler(resultCollectCallBack: (BlockId, Long, ByteBuf) => Unit)
+    extends FileClientHandler with Logging {
+
+    override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) {
+      logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)")
+      resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen))
+    }
+
+    override def handleError(blockId: BlockId) {
+      if (!isComplete) {
+        resultCollectCallBack(blockId, -1, null)
+      }
+    }
+  }
+
+  def echoResultCollectCallBack(blockId: BlockId, size: Long, content: ByteBuf) {
+    if (size != -1) {
+      logInfo("File: " + blockId + " content is : \" " + content.toString(CharsetUtil.UTF_8) + "\"")
+    }
+  }
+
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: ShuffleCopier <host> <port> <shuffle_block_id> <threads>")
+      System.exit(1)
+    }
+    val host = args(0)
+    val port = args(1).toInt
+    val blockId = BlockId(args(2))
+    val threads = if (args.length > 3) args(3).toInt else 10
+
+    val copiers = Executors.newFixedThreadPool(80)
+    val tasks = (for (i <- Range(0, threads)) yield {
+      Executors.callable(new Runnable() {
+        def run() {
+          val copier = new ShuffleCopier(new SparkConf)
+          copier.getBlock(host, port, blockId, echoResultCollectCallBack)
+        }
+      })
+    }).asJava
+    copiers.invokeAll(tasks)
+    copiers.shutdown()
+    System.exit(0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
new file mode 100644
index 0000000..95958e3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import java.io.File
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import org.apache.spark.storage.{BlockId, FileSegment}
+
+private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
+
+  val server = new FileServer(pResolver, portIn)
+  server.start()
+
+  def stop() {
+    server.stop()
+  }
+
+  def port: Int = server.getPort
+}
+
+
+/**
+ * An application for testing the shuffle sender as a standalone program.
+ */
+private[spark] object ShuffleSender {
+
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println(
+        "Usage: ShuffleSender <port> <subDirsPerLocalDir> <list of shuffle_block_directories>")
+      System.exit(1)
+    }
+
+    val port = args(0).toInt
+    val subDirsPerLocalDir = args(1).toInt
+    val localDirs = args.drop(2).map(new File(_))
+
+    val pResovler = new PathResolver {
+      override def getBlockLocation(blockId: BlockId): FileSegment = {
+        if (!blockId.isShuffle) {
+          throw new Exception("Block " + blockId + " is not a shuffle block")
+        }
+        // Figure out which local directory it hashes to, and which subdirectory in that
+        val hash = Utils.nonNegativeHash(blockId)
+        val dirId = hash % localDirs.length
+        val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
+        val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
+        val file = new File(subDir, blockId.name)
+        new FileSegment(file, 0, file.length())
+      }
+    }
+    val sender = new ShuffleSender(port, pResovler)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
deleted file mode 100644
index 9fed11b..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import java.util.concurrent.TimeoutException
-
-import io.netty.bootstrap.Bootstrap
-import io.netty.buffer.PooledByteBufAllocator
-import io.netty.channel.socket.SocketChannel
-import io.netty.channel.{ChannelFutureListener, ChannelFuture, ChannelInitializer, ChannelOption}
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder
-import io.netty.handler.codec.string.StringEncoder
-import io.netty.util.CharsetUtil
-
-import org.apache.spark.Logging
-
-/**
- * Client for fetching data blocks from [[org.apache.spark.network.netty.server.BlockServer]].
- * Use [[BlockFetchingClientFactory]] to instantiate this client.
- *
- * The constructor blocks until a connection is successfully established.
- *
- * See [[org.apache.spark.network.netty.server.BlockServer]] for client/server protocol.
- *
- * Concurrency: [[BlockFetchingClient]] is not thread safe and should not be shared.
- */
-@throws[TimeoutException]
-private[spark]
-class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String, port: Int)
-  extends Logging {
-
-  val handler = new BlockFetchingClientHandler
-
-  /** Netty Bootstrap for creating the TCP connection. */
-  private val bootstrap: Bootstrap = {
-    val b = new Bootstrap
-    b.group(factory.workerGroup)
-      .channel(factory.socketChannelClass)
-      // Use pooled buffers to reduce temporary buffer allocation
-      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-      // Disable Nagle's Algorithm since we don't want packets to wait
-      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
-      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
-      .option[Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, factory.conf.connectTimeoutMs)
-
-    b.handler(new ChannelInitializer[SocketChannel] {
-      override def initChannel(ch: SocketChannel): Unit = {
-        ch.pipeline
-          .addLast("encoder", new StringEncoder(CharsetUtil.UTF_8))
-          // maxFrameLength = 2G, lengthFieldOffset = 0, lengthFieldLength = 4
-          .addLast("framedLengthDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4))
-          .addLast("handler", handler)
-      }
-    })
-    b
-  }
-
-  /** Netty ChannelFuture for the connection. */
-  private val cf: ChannelFuture = bootstrap.connect(hostname, port)
-  if (!cf.awaitUninterruptibly(factory.conf.connectTimeoutMs)) {
-    throw new TimeoutException(
-      s"Connecting to $hostname:$port timed out (${factory.conf.connectTimeoutMs} ms)")
-  }
-
-  /**
-   * Ask the remote server for a sequence of blocks, and execute the callback.
-   *
-   * Note that this is asynchronous and returns immediately. Upstream caller should throttle the
-   * rate of fetching; otherwise we could run out of memory.
-   *
-   * @param blockIds sequence of block ids to fetch.
-   * @param blockFetchSuccessCallback callback function when a block is successfully fetched.
-   *                                  First argument is the block id, and second argument is the
-   *                                  raw data in a ByteBuffer.
-   * @param blockFetchFailureCallback callback function when we failed to fetch any of the blocks.
-   *                                  First argument is the block id, and second argument is the
-   *                                  error message.
-   */
-  def fetchBlocks(
-      blockIds: Seq[String],
-      blockFetchSuccessCallback: (String, ReferenceCountedBuffer) => Unit,
-      blockFetchFailureCallback: (String, String) => Unit): Unit = {
-    // It's best to limit the number of "write" calls since it needs to traverse the whole pipeline.
-    // It's also best to limit the number of "flush" calls since it requires system calls.
-    // Let's concatenate the string and then call writeAndFlush once.
-    // This is also why this implementation might be more efficient than multiple, separate
-    // fetch block calls.
-    var startTime: Long = 0
-    logTrace {
-      startTime = System.nanoTime
-      s"Sending request $blockIds to $hostname:$port"
-    }
-
-    // TODO: This is not the most elegant way to handle this ...
-    handler.blockFetchSuccessCallback = blockFetchSuccessCallback
-    handler.blockFetchFailureCallback = blockFetchFailureCallback
-
-    val writeFuture = cf.channel().writeAndFlush(blockIds.mkString("\n") + "\n")
-    writeFuture.addListener(new ChannelFutureListener {
-      override def operationComplete(future: ChannelFuture): Unit = {
-        if (future.isSuccess) {
-          logTrace {
-            val timeTaken = (System.nanoTime - startTime).toDouble / 1000000
-            s"Sending request $blockIds to $hostname:$port took $timeTaken ms"
-          }
-        } else {
-          // Fail all blocks.
-          logError(s"Failed to send request $blockIds to $hostname:$port", future.cause)
-          blockIds.foreach(blockFetchFailureCallback(_, future.cause.getMessage))
-        }
-      }
-    })
-  }
-
-  def waitForClose(): Unit = {
-    cf.channel().closeFuture().sync()
-  }
-
-  def close(): Unit = cf.channel().close()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
deleted file mode 100644
index 2b28402..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import io.netty.channel.epoll.{EpollEventLoopGroup, EpollSocketChannel}
-import io.netty.channel.nio.NioEventLoopGroup
-import io.netty.channel.oio.OioEventLoopGroup
-import io.netty.channel.socket.nio.NioSocketChannel
-import io.netty.channel.socket.oio.OioSocketChannel
-import io.netty.channel.{EventLoopGroup, Channel}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.network.netty.NettyConfig
-import org.apache.spark.util.Utils
-
-/**
- * Factory for creating [[BlockFetchingClient]] by using createClient. This factory reuses
- * the worker thread pool for Netty.
- *
- * Concurrency: createClient is safe to be called from multiple threads concurrently.
- */
-private[spark]
-class BlockFetchingClientFactory(val conf: NettyConfig) {
-
-  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
-
-  /** A thread factory so the threads are named (for debugging). */
-  val threadFactory = Utils.namedThreadFactory("spark-shuffle-client")
-
-  /** The following two are instantiated by the [[init]] method, depending ioMode. */
-  var socketChannelClass: Class[_ <: Channel] = _
-  var workerGroup: EventLoopGroup = _
-
-  init()
-
-  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
-  private def init(): Unit = {
-    def initOio(): Unit = {
-      socketChannelClass = classOf[OioSocketChannel]
-      workerGroup = new OioEventLoopGroup(0, threadFactory)
-    }
-    def initNio(): Unit = {
-      socketChannelClass = classOf[NioSocketChannel]
-      workerGroup = new NioEventLoopGroup(0, threadFactory)
-    }
-    def initEpoll(): Unit = {
-      socketChannelClass = classOf[EpollSocketChannel]
-      workerGroup = new EpollEventLoopGroup(0, threadFactory)
-    }
-
-    conf.ioMode match {
-      case "nio" => initNio()
-      case "oio" => initOio()
-      case "epoll" => initEpoll()
-      case "auto" =>
-        // For auto mode, first try epoll (only available on Linux), then nio.
-        try {
-          initEpoll()
-        } catch {
-          // TODO: Should we log the throwable? But that always happen on non-Linux systems.
-          // Perhaps the right thing to do is to check whether the system is Linux, and then only
-          // call initEpoll on Linux.
-          case e: Throwable => initNio()
-        }
-    }
-  }
-
-  /**
-   * Create a new BlockFetchingClient connecting to the given remote host / port.
-   *
-   * This blocks until a connection is successfully established.
-   *
-   * Concurrency: This method is safe to call from multiple threads.
-   */
-  def createClient(remoteHost: String, remotePort: Int): BlockFetchingClient = {
-    new BlockFetchingClient(this, remoteHost, remotePort)
-  }
-
-  def stop(): Unit = {
-    if (workerGroup != null) {
-      workerGroup.shutdownGracefully()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
deleted file mode 100644
index a1dbf61..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import io.netty.buffer.ByteBuf
-import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
-
-import org.apache.spark.Logging
-
-
-/**
- * Handler that processes server responses. It uses the protocol documented in
- * [[org.apache.spark.network.netty.server.BlockServer]].
- */
-private[client]
-class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] with Logging {
-
-  var blockFetchSuccessCallback: (String, ReferenceCountedBuffer) => Unit = _
-  var blockFetchFailureCallback: (String, String) => Unit = _
-
-  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
-    logError(s"Exception in connection from ${ctx.channel.remoteAddress}", cause)
-    ctx.close()
-  }
-
-  override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
-    val totalLen = in.readInt()
-    val blockIdLen = in.readInt()
-    val blockIdBytes = new Array[Byte](math.abs(blockIdLen))
-    in.readBytes(blockIdBytes)
-    val blockId = new String(blockIdBytes)
-    val blockSize = totalLen - math.abs(blockIdLen) - 4
-
-    def server = ctx.channel.remoteAddress.toString
-
-    // blockIdLen is negative when it is an error message.
-    if (blockIdLen < 0) {
-      val errorMessageBytes = new Array[Byte](blockSize)
-      in.readBytes(errorMessageBytes)
-      val errorMsg = new String(errorMessageBytes)
-      logTrace(s"Received block $blockId ($blockSize B) with error $errorMsg from $server")
-      blockFetchFailureCallback(blockId, errorMsg)
-    } else {
-      logTrace(s"Received block $blockId ($blockSize B) from $server")
-      blockFetchSuccessCallback(blockId, new ReferenceCountedBuffer(in))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala b/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
deleted file mode 100644
index 9740ee6..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-/**
- * A simple iterator that lazily initializes the underlying iterator.
- *
- * The use case is that sometimes we might have many iterators open at the same time, and each of
- * the iterator might initialize its own buffer (e.g. decompression buffer, deserialization buffer).
- * This could lead to too many buffers open. If this iterator is used, we lazily initialize those
- * buffers.
- */
-private[spark]
-class LazyInitIterator(createIterator: => Iterator[Any]) extends Iterator[Any] {
-
-  lazy val proxy = createIterator
-
-  override def hasNext: Boolean = {
-    val gotNext = proxy.hasNext
-    if (!gotNext) {
-      close()
-    }
-    gotNext
-  }
-
-  override def next(): Any = proxy.next()
-
-  def close(): Unit = Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala b/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
deleted file mode 100644
index ea1abf5..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.client
-
-import java.io.InputStream
-import java.nio.ByteBuffer
-
-import io.netty.buffer.{ByteBuf, ByteBufInputStream}
-
-
-/**
- * A buffer abstraction based on Netty's ByteBuf so we don't expose Netty.
- * This is a Scala value class.
- *
- * The buffer's life cycle is NOT managed by the JVM, and thus requiring explicit declaration of
- * reference by the retain method and release method.
- */
-private[spark]
-class ReferenceCountedBuffer(val underlying: ByteBuf) extends AnyVal {
-
-  /** Return the nio ByteBuffer view of the underlying buffer. */
-  def byteBuffer(): ByteBuffer = underlying.nioBuffer
-
-  /** Creates a new input stream that starts from the current position of the buffer. */
-  def inputStream(): InputStream = new ByteBufInputStream(underlying)
-
-  /** Increment the reference counter by one. */
-  def retain(): Unit = underlying.retain()
-
-  /** Decrement the reference counter by one and release the buffer if the ref count is 0. */
-  def release(): Unit = underlying.release()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
deleted file mode 100644
index 162e9cc..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-/**
- * Header describing a block. This is used only in the server pipeline.
- *
- * [[BlockServerHandler]] creates this, and [[BlockHeaderEncoder]] encodes it.
- *
- * @param blockSize length of the block content, excluding the length itself.
- *                 If positive, this is the header for a block (not part of the header).
- *                 If negative, this is the header and content for an error message.
- * @param blockId block id
- * @param error some error message from reading the block
- */
-private[server]
-class BlockHeader(val blockSize: Int, val blockId: String, val error: Option[String] = None)

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
deleted file mode 100644
index 8e4dda4..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import io.netty.buffer.ByteBuf
-import io.netty.channel.ChannelHandlerContext
-import io.netty.handler.codec.MessageToByteEncoder
-
-/**
- * A simple encoder for BlockHeader. See [[BlockServer]] for the server to client protocol.
- */
-private[server]
-class BlockHeaderEncoder extends MessageToByteEncoder[BlockHeader] {
-  override def encode(ctx: ChannelHandlerContext, msg: BlockHeader, out: ByteBuf): Unit = {
-    // message = message length (4 bytes) + block id length (4 bytes) + block id + block data
-    // message length = block id length (4 bytes) + size of block id + size of block data
-    val blockIdBytes = msg.blockId.getBytes
-    msg.error match {
-      case Some(errorMsg) =>
-        val errorBytes = errorMsg.getBytes
-        out.writeInt(4 + blockIdBytes.length + errorBytes.size)
-        out.writeInt(-blockIdBytes.length)  // use negative block id length to represent errors
-        out.writeBytes(blockIdBytes)  // next is blockId itself
-        out.writeBytes(errorBytes)  // error message
-      case None =>
-        out.writeInt(4 + blockIdBytes.length + msg.blockSize)
-        out.writeInt(blockIdBytes.length)  // First 4 bytes is blockId length
-        out.writeBytes(blockIdBytes)  // next is blockId itself
-        // msg of size blockSize will be written by ServerHandler
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
deleted file mode 100644
index 7b2f9a8..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import java.net.InetSocketAddress
-
-import io.netty.bootstrap.ServerBootstrap
-import io.netty.buffer.PooledByteBufAllocator
-import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
-import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel}
-import io.netty.channel.nio.NioEventLoopGroup
-import io.netty.channel.oio.OioEventLoopGroup
-import io.netty.channel.socket.SocketChannel
-import io.netty.channel.socket.nio.NioServerSocketChannel
-import io.netty.channel.socket.oio.OioServerSocketChannel
-import io.netty.handler.codec.LineBasedFrameDecoder
-import io.netty.handler.codec.string.StringDecoder
-import io.netty.util.CharsetUtil
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.network.netty.NettyConfig
-import org.apache.spark.storage.BlockDataProvider
-import org.apache.spark.util.Utils
-
-
-/**
- * Server for serving Spark data blocks.
- * This should be used together with [[org.apache.spark.network.netty.client.BlockFetchingClient]].
- *
- * Protocol for requesting blocks (client to server):
- *   One block id per line, e.g. to request 3 blocks: "block1\nblock2\nblock3\n"
- *
- * Protocol for sending blocks (server to client):
- *   frame-length (4 bytes), block-id-length (4 bytes), block-id, block-data.
- *
- *   frame-length should not include the length of itself.
- *   If block-id-length is negative, then this is an error message rather than block-data. The real
- *   length is the absolute value of the frame-length.
- *
- */
-private[spark]
-class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Logging {
-
-  def this(sparkConf: SparkConf, dataProvider: BlockDataProvider) = {
-    this(new NettyConfig(sparkConf), dataProvider)
-  }
-
-  def port: Int = _port
-
-  def hostName: String = _hostName
-
-  private var _port: Int = conf.serverPort
-  private var _hostName: String = ""
-  private var bootstrap: ServerBootstrap = _
-  private var channelFuture: ChannelFuture = _
-
-  init()
-
-  /** Initialize the server. */
-  private def init(): Unit = {
-    bootstrap = new ServerBootstrap
-    val bossThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-boss")
-    val workerThreadFactory = Utils.namedThreadFactory("spark-shuffle-server-worker")
-
-    // Use only one thread to accept connections, and 2 * num_cores for worker.
-    def initNio(): Unit = {
-      val bossGroup = new NioEventLoopGroup(1, bossThreadFactory)
-      val workerGroup = new NioEventLoopGroup(0, workerThreadFactory)
-      workerGroup.setIoRatio(conf.ioRatio)
-      bootstrap.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
-    }
-    def initOio(): Unit = {
-      val bossGroup = new OioEventLoopGroup(1, bossThreadFactory)
-      val workerGroup = new OioEventLoopGroup(0, workerThreadFactory)
-      bootstrap.group(bossGroup, workerGroup).channel(classOf[OioServerSocketChannel])
-    }
-    def initEpoll(): Unit = {
-      val bossGroup = new EpollEventLoopGroup(1, bossThreadFactory)
-      val workerGroup = new EpollEventLoopGroup(0, workerThreadFactory)
-      workerGroup.setIoRatio(conf.ioRatio)
-      bootstrap.group(bossGroup, workerGroup).channel(classOf[EpollServerSocketChannel])
-    }
-
-    conf.ioMode match {
-      case "nio" => initNio()
-      case "oio" => initOio()
-      case "epoll" => initEpoll()
-      case "auto" =>
-        // For auto mode, first try epoll (only available on Linux), then nio.
-        try {
-          initEpoll()
-        } catch {
-          // TODO: Should we log the throwable? But that always happen on non-Linux systems.
-          // Perhaps the right thing to do is to check whether the system is Linux, and then only
-          // call initEpoll on Linux.
-          case e: Throwable => initNio()
-        }
-    }
-
-    // Use pooled buffers to reduce temporary buffer allocation
-    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-    bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-
-    // Various (advanced) user-configured settings.
-    conf.backLog.foreach { backLog =>
-      bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, backLog)
-    }
-    conf.receiveBuf.foreach { receiveBuf =>
-      bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
-    }
-    conf.sendBuf.foreach { sendBuf =>
-      bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
-    }
-
-    bootstrap.childHandler(new ChannelInitializer[SocketChannel] {
-      override def initChannel(ch: SocketChannel): Unit = {
-        ch.pipeline
-          .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block id length 1024
-          .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
-          .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
-          .addLast("handler", new BlockServerHandler(dataProvider))
-      }
-    })
-
-    channelFuture = bootstrap.bind(new InetSocketAddress(_port))
-    channelFuture.sync()
-
-    val addr = channelFuture.channel.localAddress.asInstanceOf[InetSocketAddress]
-    _port = addr.getPort
-    _hostName = addr.getHostName
-  }
-
-  /** Shutdown the server. */
-  def stop(): Unit = {
-    if (channelFuture != null) {
-      channelFuture.channel().close().awaitUninterruptibly()
-      channelFuture = null
-    }
-    if (bootstrap != null && bootstrap.group() != null) {
-      bootstrap.group().shutdownGracefully()
-    }
-    if (bootstrap != null && bootstrap.childGroup() != null) {
-      bootstrap.childGroup().shutdownGracefully()
-    }
-    bootstrap = null
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
deleted file mode 100644
index cc70bd0..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import io.netty.channel.ChannelInitializer
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.LineBasedFrameDecoder
-import io.netty.handler.codec.string.StringDecoder
-import io.netty.util.CharsetUtil
-import org.apache.spark.storage.BlockDataProvider
-
-
-/** Channel initializer that sets up the pipeline for the BlockServer. */
-private[netty]
-class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
-  extends ChannelInitializer[SocketChannel] {
-
-  override def initChannel(ch: SocketChannel): Unit = {
-    ch.pipeline
-      .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block id length 1024
-      .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
-      .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
-      .addLast("handler", new BlockServerHandler(dataProvider))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
deleted file mode 100644
index 40dd5e5..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty.server
-
-import java.io.FileInputStream
-import java.nio.ByteBuffer
-import java.nio.channels.FileChannel
-
-import io.netty.buffer.Unpooled
-import io.netty.channel._
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.{FileSegment, BlockDataProvider}
-
-
-/**
- * A handler that processes requests from clients and writes block data back.
- *
- * The messages should have been processed by a LineBasedFrameDecoder and a StringDecoder first
- * so channelRead0 is called once per line (i.e. per block id).
- */
-private[server]
-class BlockServerHandler(dataProvider: BlockDataProvider)
-  extends SimpleChannelInboundHandler[String] with Logging {
-
-  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
-    logError(s"Exception in connection from ${ctx.channel.remoteAddress}", cause)
-    ctx.close()
-  }
-
-  override def channelRead0(ctx: ChannelHandlerContext, blockId: String): Unit = {
-    def client = ctx.channel.remoteAddress.toString
-
-    // A helper function to send error message back to the client.
-    def respondWithError(error: String): Unit = {
-      ctx.writeAndFlush(new BlockHeader(-1, blockId, Some(error))).addListener(
-        new ChannelFutureListener {
-          override def operationComplete(future: ChannelFuture) {
-            if (!future.isSuccess) {
-              // TODO: Maybe log the success case as well.
-              logError(s"Error sending error back to $client", future.cause)
-              ctx.close()
-            }
-          }
-        }
-      )
-    }
-
-    def writeFileSegment(segment: FileSegment): Unit = {
-      // Send error message back if the block is too large. Even though we are capable of sending
-      // large (2G+) blocks, the receiving end cannot handle it so let's fail fast.
-      // Once we fixed the receiving end to be able to process large blocks, this should be removed.
-      // Also make sure we update BlockHeaderEncoder to support length > 2G.
-
-      // See [[BlockHeaderEncoder]] for the way length is encoded.
-      if (segment.length + blockId.length + 4 > Int.MaxValue) {
-        respondWithError(s"Block $blockId size ($segment.length) greater than 2G")
-        return
-      }
-
-      var fileChannel: FileChannel = null
-      try {
-        fileChannel = new FileInputStream(segment.file).getChannel
-      } catch {
-        case e: Exception =>
-          logError(
-            s"Error opening channel for $blockId in ${segment.file} for request from $client", e)
-          respondWithError(e.getMessage)
-      }
-
-      // Found the block. Send it back.
-      if (fileChannel != null) {
-        // Write the header and block data. In the case of failures, the listener on the block data
-        // write should close the connection.
-        ctx.write(new BlockHeader(segment.length.toInt, blockId))
-
-        val region = new DefaultFileRegion(fileChannel, segment.offset, segment.length)
-        ctx.writeAndFlush(region).addListener(new ChannelFutureListener {
-          override def operationComplete(future: ChannelFuture) {
-            if (future.isSuccess) {
-              logTrace(s"Sent block $blockId (${segment.length} B) back to $client")
-            } else {
-              logError(s"Error sending block $blockId to $client; closing connection", future.cause)
-              ctx.close()
-            }
-          }
-        })
-      }
-    }
-
-    def writeByteBuffer(buf: ByteBuffer): Unit = {
-      ctx.write(new BlockHeader(buf.remaining, blockId))
-      ctx.writeAndFlush(Unpooled.wrappedBuffer(buf)).addListener(new ChannelFutureListener {
-        override def operationComplete(future: ChannelFuture) {
-          if (future.isSuccess) {
-            logTrace(s"Sent block $blockId (${buf.remaining} B) back to $client")
-          } else {
-            logError(s"Error sending block $blockId to $client; closing connection", future.cause)
-            ctx.close()
-          }
-        }
-      })
-    }
-
-    logTrace(s"Received request from $client to fetch block $blockId")
-
-    var blockData: Either[FileSegment, ByteBuffer] = null
-
-    // First make sure we can find the block. If not, send error back to the user.
-    try {
-      blockData = dataProvider.getBlockData(blockId)
-    } catch {
-      case e: Exception =>
-        logError(s"Error opening block $blockId for request from $client", e)
-        respondWithError(e.getMessage)
-        return
-    }
-
-    blockData match {
-      case Left(segment) => writeFileSegment(segment)
-      case Right(buf) => writeByteBuffer(buf)
-    }
-
-  }  // end of channelRead0
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala b/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
deleted file mode 100644
index 5b6d086..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-
-/**
- * An interface for providing data for blocks.
- *
- * getBlockData returns either a FileSegment (for zero-copy send), or a ByteBuffer.
- *
- * Aside from unit tests, [[BlockManager]] is the main class that implements this.
- */
-private[spark] trait BlockDataProvider {
-  def getBlockData(blockId: String): Either[FileSegment, ByteBuffer]
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 91c0f47..5f44f5f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -18,17 +18,19 @@
 package org.apache.spark.storage
 
 import java.util.concurrent.LinkedBlockingQueue
-import org.apache.spark.network.netty.client.{LazyInitIterator, ReferenceCountedBuffer}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashSet
 import scala.collection.mutable.Queue
 import scala.util.{Failure, Success}
 
+import io.netty.buffer.ByteBuf
+
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.executor.ShuffleReadMetrics
 import org.apache.spark.network.BufferMessage
 import org.apache.spark.network.ConnectionManagerId
+import org.apache.spark.network.netty.ShuffleCopier
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.Utils
 
@@ -52,28 +54,18 @@ trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] wi
 private[storage]
 object BlockFetcherIterator {
 
-  /**
-   * A request to fetch blocks from a remote BlockManager.
-   * @param address remote BlockManager to fetch from.
-   * @param blocks Sequence of tuple, where the first element is the block id,
-   *               and the second element is the estimated size, used to calculate bytesInFlight.
-   */
+  // A request to fetch one or more blocks, complete with their sizes
   class FetchRequest(val address: BlockManagerId, val blocks: Seq[(BlockId, Long)]) {
     val size = blocks.map(_._2).sum
   }
 
-  /**
-   * Result of a fetch from a remote block. A failure is represented as size == -1.
-   * @param blockId block id
-   * @param size estimated size of the block, used to calculate bytesInFlight.
-   *             Note that this is NOT the exact bytes.
-   * @param deserialize closure to return the result in the form of an Iterator.
-   */
+  // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize
+  // the block (since we want all deserializaton to happen in the calling thread); can also
+  // represent a fetch failure if size == -1.
   class FetchResult(val blockId: BlockId, val size: Long, val deserialize: () => Iterator[Any]) {
     def failed: Boolean = size == -1
   }
 
-  // TODO: Refactor this whole thing to make code more reusable.
   class BasicBlockFetcherIterator(
       private val blockManager: BlockManager,
       val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
@@ -103,10 +95,10 @@ object BlockFetcherIterator {
 
     // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that
     // the number of bytes in flight is limited to maxBytesInFlight
-    protected val fetchRequests = new Queue[FetchRequest]
+    private val fetchRequests = new Queue[FetchRequest]
 
     // Current bytes in flight from our requests
-    protected var bytesInFlight = 0L
+    private var bytesInFlight = 0L
 
     protected def sendRequest(req: FetchRequest) {
       logDebug("Sending request for %d blocks (%s) from %s".format(
@@ -270,55 +262,77 @@ object BlockFetcherIterator {
       readMetrics: ShuffleReadMetrics)
     extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) {
 
-    override protected def sendRequest(req: FetchRequest) {
-      logDebug("Sending request for %d blocks (%s) from %s".format(
-        req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
-      val cmId = new ConnectionManagerId(req.address.host, req.address.port)
+    import blockManager._
 
-      bytesInFlight += req.size
-      val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
-
-      // This could throw a TimeoutException. In that case we will just retry the task.
-      val client = blockManager.nettyBlockClientFactory.createClient(
-        cmId.host, req.address.nettyPort)
-      val blocks = req.blocks.map(_._1.toString)
-
-      client.fetchBlocks(
-        blocks,
-        (blockId: String, refBuf: ReferenceCountedBuffer) => {
-          // Increment the reference count so the buffer won't be recycled.
-          // TODO: This could result in memory leaks when the task is stopped due to exception
-          // before the iterator is exhausted.
-          refBuf.retain()
-          val buf = refBuf.byteBuffer()
-          val blockSize = buf.remaining()
-          val bid = BlockId(blockId)
-
-          // TODO: remove code duplication between here and BlockManager.dataDeserialization.
-          results.put(new FetchResult(bid, sizeMap(bid), () => {
-            def createIterator: Iterator[Any] = {
-              val stream = blockManager.wrapForCompression(bid, refBuf.inputStream())
-              serializer.newInstance().deserializeStream(stream).asIterator
-            }
-            new LazyInitIterator(createIterator) {
-              // Release the buffer when we are done traversing it.
-              override def close(): Unit = refBuf.release()
+    val fetchRequestsSync = new LinkedBlockingQueue[FetchRequest]
+
+    private def startCopiers(numCopiers: Int): List[_ <: Thread] = {
+      (for ( i <- Range(0,numCopiers) ) yield {
+        val copier = new Thread {
+          override def run(){
+            try {
+              while(!isInterrupted && !fetchRequestsSync.isEmpty) {
+                sendRequest(fetchRequestsSync.take())
+              }
+            } catch {
+              case x: InterruptedException => logInfo("Copier Interrupted")
+              // case _ => throw new SparkException("Exception Throw in Shuffle Copier")
             }
-          }))
-
-          readMetrics.synchronized {
-            readMetrics.remoteBytesRead += blockSize
-            readMetrics.remoteBlocksFetched += 1
-          }
-          logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
-        },
-        (blockId: String, errorMsg: String) => {
-          logError(s"Could not get block(s) from $cmId with error: $errorMsg")
-          for ((blockId, size) <- req.blocks) {
-            results.put(new FetchResult(blockId, -1, null))
           }
         }
-      )
+        copier.start
+        copier
+      }).toList
+    }
+
+    // keep this to interrupt the threads when necessary
+    private def stopCopiers() {
+      for (copier <- copiers) {
+        copier.interrupt()
+      }
+    }
+
+    override protected def sendRequest(req: FetchRequest) {
+
+      def putResult(blockId: BlockId, blockSize: Long, blockData: ByteBuf) {
+        val fetchResult = new FetchResult(blockId, blockSize,
+          () => dataDeserialize(blockId, blockData.nioBuffer, serializer))
+        results.put(fetchResult)
+      }
+
+      logDebug("Sending request for %d blocks (%s) from %s".format(
+        req.blocks.size, Utils.bytesToString(req.size), req.address.host))
+      val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
+      val cpier = new ShuffleCopier(blockManager.conf)
+      cpier.getBlocks(cmId, req.blocks, putResult)
+      logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
+    }
+
+    private var copiers: List[_ <: Thread] = null
+
+    override def initialize() {
+      // Split Local Remote Blocks and set numBlocksToFetch
+      val remoteRequests = splitLocalRemoteBlocks()
+      // Add the remote requests into our queue in a random order
+      for (request <- Utils.randomize(remoteRequests)) {
+        fetchRequestsSync.put(request)
+      }
+
+      copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
+      logInfo("Started " + fetchRequestsSync.size + " remote fetches in " +
+        Utils.getUsedTimeMs(startTime))
+
+      // Get Local Blocks
+      startTime = System.currentTimeMillis
+      getLocalBlocks()
+      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
+    }
+
+    override def next(): (BlockId, Option[Iterator[Any]]) = {
+      resultsGotten += 1
+      val result = results.take()
+      // If all the results has been retrieved, copiers will exit automatically
+      (result.blockId, if (result.failed) None else Some(result.deserialize()))
     }
   }
   // End of NettyBlockFetcherIterator

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e676769..e8bbd29 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -25,19 +25,16 @@ import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 import scala.util.Random
 
-import akka.actor.{ActorSystem, Props}
+import akka.actor.{ActorSystem, Cancellable, Props}
 import sun.nio.ch.DirectBuffer
 
 import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
-import org.apache.spark.network.netty.client.BlockFetchingClientFactory
-import org.apache.spark.network.netty.server.BlockServer
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util._
 
-
 private[spark] sealed trait BlockValues
 private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
 private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
@@ -61,7 +58,7 @@ private[spark] class BlockManager(
     val conf: SparkConf,
     securityManager: SecurityManager,
     mapOutputTracker: MapOutputTracker)
-  extends BlockDataProvider with Logging {
+  extends Logging {
 
   private val port = conf.getInt("spark.blockManager.port", 0)
   val shuffleBlockManager = new ShuffleBlockManager(this)
@@ -89,25 +86,13 @@ private[spark] class BlockManager(
     new TachyonStore(this, tachyonBlockManager)
   }
 
-  private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
-
   // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
-  private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = {
-    if (useNetty) new BlockFetchingClientFactory(conf) else null
+  private val nettyPort: Int = {
+    val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
+    val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0)
+    if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
   }
 
-  private val nettyBlockServer: BlockServer = {
-    if (useNetty) {
-      val server = new BlockServer(conf, this)
-      logInfo(s"Created NettyBlockServer binding to port: ${server.port}")
-      server
-    } else {
-      null
-    }
-  }
-
-  private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0
-
   val blockManagerId = BlockManagerId(
     executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
 
@@ -231,20 +216,6 @@ private[spark] class BlockManager(
     }
   }
 
-  override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] = {
-    val bid = BlockId(blockId)
-    if (bid.isShuffle) {
-      Left(diskBlockManager.getBlockLocation(bid))
-    } else {
-      val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
-      if (blockBytesOpt.isDefined) {
-        Right(blockBytesOpt.get)
-      } else {
-        throw new BlockNotFoundException(blockId)
-      }
-    }
-  }
-
   /**
    * Get the BlockStatus for the block identified by the given ID, if it exists.
    * NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
@@ -1090,14 +1061,6 @@ private[spark] class BlockManager(
     connectionManager.stop()
     shuffleBlockManager.stop()
     diskBlockManager.stop()
-
-    if (nettyBlockClientFactory != null) {
-      nettyBlockClientFactory.stop()
-    }
-    if (nettyBlockServer != null) {
-      nettyBlockServer.stop()
-    }
-
     actorSystem.stop(slaveActor)
     blockInfo.clear()
     memoryStore.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala b/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala
deleted file mode 100644
index 9ef4536..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockNotFoundException.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-
-class BlockNotFoundException(blockId: String) extends Exception(s"Block $blockId not found")

http://git-wip-us.apache.org/repos/asf/spark/blob/fd9fcd25/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index f3da816..4d66cce 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -23,7 +23,7 @@ import java.util.{Date, Random, UUID}
 
 import org.apache.spark.{SparkEnv, Logging}
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.network.netty.PathResolver
+import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
 import org.apache.spark.util.Utils
 import org.apache.spark.shuffle.sort.SortShuffleManager
 
@@ -52,6 +52,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
   }
   private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
+  private var shuffleSender : ShuffleSender = null
 
   addShutdownHook()
 
@@ -185,5 +186,15 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
         }
       }
     }
+
+    if (shuffleSender != null) {
+      shuffleSender.stop()
+    }
+  }
+
+  private[storage] def startShuffleBlockSender(port: Int): Int = {
+    shuffleSender = new ShuffleSender(port, this)
+    logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}")
+    shuffleSender.port
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message