Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A76A3114C7 for ; Sat, 16 Aug 2014 21:16:09 +0000 (UTC) Received: (qmail 2979 invoked by uid 500); 16 Aug 2014 21:16:09 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 2938 invoked by uid 500); 16 Aug 2014 21:16:09 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 2928 invoked by uid 99); 16 Aug 2014 21:16:09 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Aug 2014 21:16:09 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id CECDA929071; Sat, 16 Aug 2014 21:16:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joshrosen@apache.org To: commits@spark.apache.org Message-Id: <74b491116cb84168a0602135c0b3fdf0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: [SPARK-2677] BasicBlockFetchIterator#next can wait forever Date: Sat, 16 Aug 2014 21:16:08 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 4bdfaa16f -> 76fa0eaf5 [SPARK-2677] BasicBlockFetchIterator#next can wait forever Author: Kousuke Saruta Closes #1632 from sarutak/SPARK-2677 and squashes the following commits: cddbc7b [Kousuke Saruta] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message d3bd2a8 [Kousuke Saruta] Modified configuration.md for spark.core.connection.ack.timeout e85f88b [Kousuke Saruta] Removed useless synchronized blocks 7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide 9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala 7cbb8ca [Kousuke Saruta] Modified to match with scalastyle 8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case remote Executor cannot ack a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76fa0eaf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76fa0eaf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76fa0eaf Branch: refs/heads/master Commit: 76fa0eaf515fd6771cdd69422b1259485debcae5 Parents: 4bdfaa1 Author: Kousuke Saruta Authored: Sat Aug 16 14:15:58 2014 -0700 Committer: Josh Rosen Committed: Sat Aug 16 14:15:58 2014 -0700 ---------------------------------------------------------------------- .../spark/network/ConnectionManager.scala | 45 +++++++++++++++----- .../spark/network/ConnectionManagerSuite.scala | 44 ++++++++++++++++++- docs/configuration.md | 9 ++++ 3 files changed, 87 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/76fa0eaf/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 95f96b8..37d69a9 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -22,6 +22,7 @@ import java.nio._ import java.nio.channels._ import java.nio.channels.spi._ import java.net._ +import java.util.{Timer, TimerTask} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor} @@ -61,17 +62,17 @@ private[spark] class ConnectionManager( var ackMessage: Option[Message] = None def markDone(ackMessage: Option[Message]) { - this.synchronized { - this.ackMessage = ackMessage - completionHandler(this) - } + this.ackMessage = ackMessage + completionHandler(this) } } private val selector = SelectorProvider.provider.openSelector() + private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true) // default to 30 second timeout waiting for authentication private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30) + private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) private val handleMessageExecutor = new ThreadPoolExecutor( conf.getInt("spark.core.connection.handler.threads.min", 20), @@ -652,19 +653,27 @@ private[spark] class ConnectionManager( } } if (bufferMessage.hasAckId()) { - val sentMessageStatus = messageStatuses.synchronized { + messageStatuses.synchronized { messageStatuses.get(bufferMessage.ackId) match { case Some(status) => { messageStatuses -= bufferMessage.ackId - status + status.markDone(Some(message)) } case None => { - throw new Exception("Could not find reference for received ack message " + - message.id) + /** + * We can fall down on this code because of following 2 cases + * + * (1) Invalid ack sent due to buggy code. + * + * (2) Late-arriving ack for a SendMessageStatus + * To avoid unwilling late-arriving ack + * caused by long pause like GC, you can set + * larger value than default to spark.core.connection.ack.wait.timeout + */ + logWarning(s"Could not find reference for received ack Message ${message.id}") } } } - sentMessageStatus.markDone(Some(message)) } else { var ackMessage : Option[Message] = None try { @@ -836,9 +845,23 @@ private[spark] class ConnectionManager( def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message) : Future[Message] = { val promise = Promise[Message]() + + val timeoutTask = new TimerTask { + override def run(): Unit = { + messageStatuses.synchronized { + messageStatuses.remove(message.id).foreach ( s => { + promise.failure( + new IOException(s"sendMessageReliably failed because ack " + + "was not received within ${ackTimeout} sec")) + }) + } + } + } + val status = new MessageStatus(message, connectionManagerId, s => { + timeoutTask.cancel() s.ackMessage match { - case None => // Indicates a failure where we either never sent or never got ACK'd + case None => // Indicates a failure where we either never sent or never got ACK'd promise.failure(new IOException("sendMessageReliably failed without being ACK'd")) case Some(ackMessage) => if (ackMessage.hasError) { @@ -852,6 +875,8 @@ private[spark] class ConnectionManager( messageStatuses.synchronized { messageStatuses += ((message.id, status)) } + + ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000) sendMessage(connectionManagerId, message) promise.future } http://git-wip-us.apache.org/repos/asf/spark/blob/76fa0eaf/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala index 846537d..e2f4d4c 100644 --- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala @@ -19,14 +19,19 @@ package org.apache.spark.network import java.io.IOException import java.nio._ +import java.util.concurrent.TimeoutException import org.apache.spark.{SecurityManager, SparkConf} import org.scalatest.FunSuite +import org.mockito.Mockito._ +import org.mockito.Matchers._ + +import scala.concurrent.TimeoutException import scala.concurrent.{Await, TimeoutException} import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.Try +import scala.util.{Failure, Success, Try} /** * Test the ConnectionManager with various security settings. @@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite { } + test("sendMessageReliably timeout") { + val clientConf = new SparkConf + clientConf.set("spark.authenticate", "false") + val ackTimeout = 30 + clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}") + + val clientSecurityManager = new SecurityManager(clientConf) + val manager = new ConnectionManager(0, clientConf, clientSecurityManager) + + val serverConf = new SparkConf + serverConf.set("spark.authenticate", "false") + val serverSecurityManager = new SecurityManager(serverConf) + val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager) + managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { + // sleep 60 sec > ack timeout for simulating server slow down or hang up + Thread.sleep(ackTimeout * 3 * 1000) + None + }) + + val size = 10 * 1024 * 1024 + val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) + buffer.flip + val bufferMessage = Message.createBufferMessage(buffer.duplicate) + + val future = manager.sendMessageReliably(managerServer.id, bufferMessage) + + // Future should throw IOException in 30 sec. + // Otherwise TimeoutExcepton is thrown from Await.result. + // We expect TimeoutException is not thrown. + intercept[IOException] { + Await.result(future, (ackTimeout * 2) second) + } + + manager.stop() + managerServer.stop() + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/76fa0eaf/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index c408c46..981170d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -885,6 +885,15 @@ Apart from these, the following properties are also available, and may be useful + spark.core.connection.ack.wait.timeout + 60 + + Number of seconds for the connection to wait for ack to occur before timing + out and giving up. To avoid unwilling timeout caused by long pause like GC, + you can set larger value. + + + spark.ui.filters None --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org