spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: Sliding Window Memory use
Date Tue, 27 Sep 2016 04:49:22 GMT
I ran it on Databricks community edition which was a local[8] cluster with
6GB of RAM. It ran fine.

That said, looking at the plan, we can definitely simplify this quite a
bit. We had a new Window physical execution node for each window
expression, when we could have collapsed all of them into a single one.


On Mon, Sep 26, 2016 at 9:03 AM, Jeremy Davis <jerdavis@speakeasy.net>
wrote:

>
> Hi, I posted this to users, but didn’t get any responses.
> I just wanted to highlight what seems like excessive memory use when using
> sliding windows.
> I have attached a test case where starting with certainly less than 1MB of
> data I can OOM a 10G heap.
>
> Regards,
> -JD
>
>
>
> --------------
>
>
> import java.sql.Timestamp
>
> import org.apache.spark.sql.SparkSession
> import org.junit.Test
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
>
> import scala.collection.mutable.ArrayBuffer
>
>
> /**
>  * A Small Unit Test to demonstrate Spark Window Functions OOM
>  */
> class SparkTest {
>
>
>   @Test
>   def testWindows() {
>     val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
>     import sparkSession.implicits._
>
>     println("Init Dataset")
>
>     val partitions = (0 until 4)
>     val entries = (0 until 6500)
>
>     //val windows = (5 to 15 by 5) //Works
>     val windows = (5 to 65 by 5)   //OOM 10G
>
>     val testData = new ArrayBuffer[(String,Timestamp,Double)]
>
>
>     for( p <- partitions) {
>       for( e <- entries ) yield {
>         testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
>       }
>     }
>
>     val ds = testData.toDF("key","datetime","value")
>     ds.show()
>
>
>     var resultFrame = ds
>     resultFrame.schema.fields.foreach(println)
>
>
>     val baseWin = Window.partitionBy("key").orderBy("datetime")
>     for( win <- windows ) {
>       resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
>     }
>     resultFrame.show()
>
>   }
>
> }
>
>
>
>

Mime
View raw message