Return-Path: X-Original-To: apmail-spark-issues-archive@minotaur.apache.org Delivered-To: apmail-spark-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90D2318253 for ; Tue, 2 Feb 2016 01:18:40 +0000 (UTC) Received: (qmail 24401 invoked by uid 500); 2 Feb 2016 01:18:40 -0000 Delivered-To: apmail-spark-issues-archive@spark.apache.org Received: (qmail 24347 invoked by uid 500); 2 Feb 2016 01:18:40 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 24309 invoked by uid 99); 2 Feb 2016 01:18:40 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Feb 2016 01:18:40 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 09B8C2C1F6F for ; Tue, 2 Feb 2016 01:18:40 +0000 (UTC) Date: Tue, 2 Feb 2016 01:18:40 +0000 (UTC) From: "Apache Spark (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Assigned] (SPARK-13122) Race condition in MemoryStore.unrollSafely() causes memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-13122?page=3Dcom.atlassi= an.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-13122: ------------------------------------ Assignee: Apache Spark > Race condition in MemoryStore.unrollSafely() causes memory leak > --------------------------------------------------------------- > > Key: SPARK-13122 > URL: https://issues.apache.org/jira/browse/SPARK-13122 > Project: Spark > Issue Type: Bug > Components: Spark Core, Streaming > Affects Versions: 1.6.0 > Reporter: Adam Budde > Assignee: Apache Spark > > The [unrollSafely()|https://github.com/apache/spark/blob/v1.6.0/core/src/= main/scala/org/apache/spark/storage/MemoryStore.scala#L249] method in Memor= yStore will progressively unroll the contents of a block iterator into memo= ry. It works by reserving an initial chunk of unroll memory and periodicall= y checking if more memory must be reserved as it unrolls the iterator. The = memory reserved for performing the unroll is considered "pending" memory an= d is tracked on a per-task attempt ID bases in a map object named pendingUn= rollMemoryMap. When the unrolled block is committed to storage memory in th= e [tryToPut()|https://github.com/apache/spark/blob/v1.6.0/core/src/main/sca= la/org/apache/spark/storage/MemoryStore.scala#L362] method, a method named = [releasePendingUnrollMemoryForThisTask()|https://github.com/apache/spark/bl= ob/v1.6.0/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L5= 21] is invoked and this pending memory is released. tryToPut() then proceed= s to allocate the storage memory required for the block. > The unrollSafely() method computes the amount of pending memory used for = the unroll operation by saving the amount of unroll memory reserved for the= particular task attempt ID at the start of the method in a variable named = previousMemoryReserved and subtracting this value from the unroll memory de= dicated to the task at the end of the method. This value is stored as the v= ariable amountToTransferToPending. This amount is then subtracted from the = per-task unrollMemoryMap and added to pendingUnrollMemoryMap. > The amount of unroll memory consumed for the task is obtained from unroll= MemoryMap via the currentUnrollMemoryForThisTask method. In order for the s= emantics of unrollSafely() to work, the value of unrollMemoryMap for the ta= sk returned by [currentTaskAttemptId()|https://github.com/apache/spark/blob= /v1.6.0/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L475= ] must not be mutated between the computation of previousMemoryReserved and= amountToTransferToPending. However, since there is no synchronization in p= lace to ensure that computing both variables and updating the memory maps h= appens atomically, a race condition can occur when multiple threads for whi= ch currentTaskAttemptId() returns the same value are both trying to store b= locks. This can lead to a negative value being computed for amountToTransfe= rToPending, corrupting the unrollMemoryMap and pendingUnrollMemoryMap memor= y maps which in turn can lead to the memory manager leaking unroll memory. > For example, lets consider how the state of the unrollMemoryMap and pendi= ngUnrollMemoryMap variables might be affected if two threads returning the = same value for currentTaskAttemptId() both execute unrollSafely() concurren= tly: > ||Thread 1||Thread 2||unrollMemoryMap||pendingUnrollMemoryMap|| > |Enter unrollSafely()|-|0|0| > |perviousMemoryReserved =3D 0|-|0|0| > |(perform unroll)|-|2097152 (2 MiB)|0| > |-|Enter unrollSafely()|2097152 (2 MiB)|0|=20 > |-|perviousMemoryReserved =3D 2097152|2097152 (2 MiB)|0| > |-|(performUnroll)|3145728 (3 MiB)|0| > |Enter finally { }|-|3145728 (3 MiB)|0|=20 > |amtToTransfer =3D 3145728|-|3145728 (3 MiB)|0| > |Update memory maps|-|0|3145728 (3 MiB)| > |Return|Enter finally { }|0|3145728 (3 MiB)| > |-|amtToTrasnfer =3D -3145728|0|3145728 (3 MiB)| > |-|Update memory maps|-3145728 (3 MiB)|0| > In this example, we end up leaking 3 MiB of unroll memory since both Thre= ad 1 and Thread 2 think that the task has no pending unroll memory allocate= d to it. The negative value stored in unrollMemoryMap will also propagate t= o future invocations of unrollSafely(). > In our particular case, this behavior manifests since the currentTaskAtte= mptId() method is returning -1 for each Spark receiver task. This in and of= itself could be a bug and is something I'm going to look into. We noticed = that blocks would start to spill over to disk when more than enough storage= memory was available, so we inserted log statements into MemoryManager's a= cquireUnrollMemory() and releaseUnrollMemory() in order to collect the numb= er of unroll bytes acquired and released. When we plot the output, it is ap= parent that unroll memory is being leaked: > !https://raw.githubusercontent.com/budde/spark_debug/master/plots/Unified= MemoryManager/leaked_unroll_bytes.png! > Running difference between acquire/release bytes: > !https://raw.githubusercontent.com/budde/spark_debug/master/plots/Unified= MemoryManager/leaked_unroll_bytes_running_diff.png! > However, if we change the implementation of unrollSafely() so that the en= tire method is within a synchronized block, we no longer see this leak: > !https://raw.githubusercontent.com/budde/spark_debug/master/plots/Unified= MemoryManager/synchronized_unroll_bytes.png! > Running difference between acquire/release bytes: > !https://raw.githubusercontent.com/budde/spark_debug/master/plots/Unified= MemoryManager/synchronized_unroll_bytes_running_diff.png! -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org