Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B53DF100A2 for ; Thu, 18 Dec 2014 00:31:14 +0000 (UTC) Received: (qmail 71017 invoked by uid 500); 18 Dec 2014 00:31:14 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 70989 invoked by uid 500); 18 Dec 2014 00:31:14 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 70980 invoked by uid 99); 18 Dec 2014 00:31:14 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Dec 2014 00:31:14 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 47EEF8C4589; Thu, 18 Dec 2014 00:31:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andrewor14@apache.org To: commits@spark.apache.org Message-Id: <9b80a4ead38b4ecf8b1fdc1a0ecd030a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-4006] In long running contexts, we encountered the situation of d... Date: Thu, 18 Dec 2014 00:31:14 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-1.0 b9b6762f1 -> f0eed6e30 [SPARK-4006] In long running contexts, we encountered the situation of d... ...ouble registe... ...r without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. This is just like https://github.com/apache/spark/pull/2886 except it's on branch-1.0 Author: Tal Sliwowicz Closes #2914 from tsliwowicz/branch-1.0-block-mgr-removal and squashes the following commits: 1014493 [Tal Sliwowicz] [SPARK-4006] In long running contexts, we encountered the situation of double registe... Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0eed6e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0eed6e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0eed6e3 Branch: refs/heads/branch-1.0 Commit: f0eed6e30467d8db640303848f41053c433949a6 Parents: b9b6762 Author: Tal Sliwowicz Authored: Wed Dec 17 14:14:22 2014 -0800 Committer: Andrew Or Committed: Wed Dec 17 14:14:22 2014 -0800 ---------------------------------------------------------------------- .../spark/storage/BlockManagerMasterActor.scala | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f0eed6e3/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 6aed322..7b90858 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -208,6 +208,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId)) + logInfo(s"Removing block manager $blockManagerId") } private def expireDeadHosts() { @@ -328,16 +329,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { - case Some(manager) => - // A block manager of the same executor already exists. - // This should never happen. Let's just quit. - logError("Got two different block manager registrations on " + id.executorId) - System.exit(1) + case Some(oldId) => + // A block manager of the same executor already exists, so remove it (assumed dead) + logError("Got two different block manager registrations on same executor - " + + s" will replace old one $oldId with new one $id") + removeExecutor(id.executorId) case None => - blockManagerIdByExecutor(id.executorId) = id } - blockManagerInfo(id) = - new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) + logInfo("Registering block manager %s with %s RAM, %s".format( + id.hostPort, Utils.bytesToString(maxMemSize), id)) + + blockManagerIdByExecutor(id.executorId) = id + + blockManagerInfo(id) = new BlockManagerInfo( + id, System.currentTimeMillis(), maxMemSize, slaveActor) } listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org