spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yan Chen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-15716) Memory usage keep growing up in Spark Streaming
Date Thu, 02 Jun 2016 20:46:59 GMT

     [ https://issues.apache.org/jira/browse/SPARK-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Yan Chen updated SPARK-15716:
-----------------------------
    Description: 
Code:
{code:java}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

public class App {

  public static void main(String[] args) {
    final String input = args[0];
    final String check = args[1];
    final long interval = Long.parseLong(args[2]);

    final SparkConf conf = new SparkConf();
    conf.set("spark.streaming.minRememberDuration", "180s");
    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
    conf.set("spark.streaming.unpersist", "true");
    conf.set("spark.streaming.ui.retainedBatches", "10");
    conf.set("spark.ui.retainedJobs", "10");
    conf.set("spark.ui.retainedStages", "10");
    conf.set("spark.worker.ui.retainedExecutors", "10");
    conf.set("spark.worker.ui.retainedDrivers", "10");
    conf.set("spark.sql.ui.retainedExecutions", "10");

    JavaStreamingContextFactory jscf = () -> {
      SparkContext sc = new SparkContext(conf);
      sc.setCheckpointDir(check);

      StreamingContext ssc = new StreamingContext(sc, Durations.milliseconds(interval));
      JavaStreamingContext jssc = new JavaStreamingContext(ssc);

      jssc.checkpoint(check);

      // setup pipeline here
      JavaPairDStream<LongWritable, Text> inputStream =
          jssc.fileStream(
              input,
              LongWritable.class,
              Text.class,
              TextInputFormat.class,
              (filepath) -> Boolean.TRUE,
              false
          );

      JavaPairDStream<LongWritable, Text> usbk = inputStream
          .updateStateByKey((current, state) -> state);

      usbk.checkpoint(Durations.seconds(10));

      usbk.foreachRDD(rdd -> {
        rdd.count();
        System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
        return null;
      });

      return jssc;
    };

    JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);

    jssc.start();
    jssc.awaitTermination();


  }

}
{code}

Command used to run the code

{code:none}
spark-submit --keytab [keytab] --principal [principal] --class [package].App --master yarn
--driver-memory 1g --executor-memory 1G --conf "spark.driver.maxResultSize=0" --conf "spark.logConf=true"
--conf "spark.executor.instances=2" --conf "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal
-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions" --conf "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions" [jar-file-path] file:///[dir-on-nas-drive]
[dir-on-hdfs] 200
{code}

It's a very simple piece of code, when I ran it, the memory usage of driver keeps going up.
There is no file input in our runs. Batch interval is set to 200 milliseconds; processing
time for each batch is below 150 milliseconds, while most of which are below 70 milliseconds.

!http://i.imgur.com/uSzUui6.png!

The right most four red triangles are full GC's which are triggered manually by using "jcmd
pid GC.run" command.


  was:
Code:
{code:java}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;

public class App {

  public static void main(String[] args) {
    final String input = args[0];
    final String check = args[1];
    final long interval = Long.parseLong(args[2]);

    final SparkConf conf = new SparkConf();
    conf.set("spark.streaming.minRememberDuration", "180s");
    conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
    conf.set("spark.streaming.unpersist", "true");
    conf.set("spark.streaming.ui.retainedBatches", "10");
    conf.set("spark.ui.retainedJobs", "10");
    conf.set("spark.ui.retainedStages", "10");
    conf.set("spark.worker.ui.retainedExecutors", "10");
    conf.set("spark.worker.ui.retainedDrivers", "10");
    conf.set("spark.sql.ui.retainedExecutions", "10");

    JavaStreamingContextFactory jscf = () -> {
      SparkContext sc = new SparkContext(conf);
      sc.setCheckpointDir(check);

      StreamingContext ssc = new StreamingContext(sc, Durations.milliseconds(interval));
      JavaStreamingContext jssc = new JavaStreamingContext(ssc);

      jssc.checkpoint(check);

      // setup pipeline here
      JavaPairDStream<LongWritable, Text> inputStream =
          jssc.fileStream(
              input,
              LongWritable.class,
              Text.class,
              TextInputFormat.class,
              (filepath) -> Boolean.TRUE,
              false
          );

      JavaPairDStream<LongWritable, Text> usbk = inputStream
          .updateStateByKey((current, state) -> state);

      usbk.checkpoint(Durations.seconds(10));

      usbk.foreachRDD(rdd -> {
        rdd.count();
        System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
        return null;
      });

      return jssc;
    };

    JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);

    jssc.start();
    jssc.awaitTermination();


  }

}
{code}

Command used to run the code

{code:none}
spark-submit --keytab [keytab] --principal [principal] --class [package].App --master yarn
--driver-memory 1g --executor-memory 1G --conf "spark.driver.maxResultSize=0" --conf "spark.logConf=true"
--conf "spark.executor.instances=2" --conf "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal
-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark" --conf "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark" [jar-file-path]
file:///[dir-on-nas-drive] [dir-on-hdfs] 200
{code}

It's a very simple piece of code, when I ran it, the memory usage of driver keeps going up.
There is no file input in our runs. Batch interval is set to 200 milliseconds; processing
time for each batch is below 150 milliseconds, while most of which are below 70 milliseconds.

!http://i.imgur.com/uSzUui6.png!

The right most four red triangles are full GC's which are triggered manually by using "jcmd
pid GC.run" command.



> Memory usage keep growing up in Spark Streaming
> -----------------------------------------------
>
>                 Key: SPARK-15716
>                 URL: https://issues.apache.org/jira/browse/SPARK-15716
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.1
>         Environment: Oracle Java 1.8.0_51, SUSE Linux
>            Reporter: Yan Chen
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Code:
> {code:java}
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
> import org.apache.spark.SparkConf;
> import org.apache.spark.SparkContext;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.StreamingContext;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
> public class App {
>   public static void main(String[] args) {
>     final String input = args[0];
>     final String check = args[1];
>     final long interval = Long.parseLong(args[2]);
>     final SparkConf conf = new SparkConf();
>     conf.set("spark.streaming.minRememberDuration", "180s");
>     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
>     conf.set("spark.streaming.unpersist", "true");
>     conf.set("spark.streaming.ui.retainedBatches", "10");
>     conf.set("spark.ui.retainedJobs", "10");
>     conf.set("spark.ui.retainedStages", "10");
>     conf.set("spark.worker.ui.retainedExecutors", "10");
>     conf.set("spark.worker.ui.retainedDrivers", "10");
>     conf.set("spark.sql.ui.retainedExecutions", "10");
>     JavaStreamingContextFactory jscf = () -> {
>       SparkContext sc = new SparkContext(conf);
>       sc.setCheckpointDir(check);
>       StreamingContext ssc = new StreamingContext(sc, Durations.milliseconds(interval));
>       JavaStreamingContext jssc = new JavaStreamingContext(ssc);
>       jssc.checkpoint(check);
>       // setup pipeline here
>       JavaPairDStream<LongWritable, Text> inputStream =
>           jssc.fileStream(
>               input,
>               LongWritable.class,
>               Text.class,
>               TextInputFormat.class,
>               (filepath) -> Boolean.TRUE,
>               false
>           );
>       JavaPairDStream<LongWritable, Text> usbk = inputStream
>           .updateStateByKey((current, state) -> state);
>       usbk.checkpoint(Durations.seconds(10));
>       usbk.foreachRDD(rdd -> {
>         rdd.count();
>         System.out.println("usbk: " + rdd.toDebugString().split("\n").length);
>         return null;
>       });
>       return jssc;
>     };
>     JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(check, jscf);
>     jssc.start();
>     jssc.awaitTermination();
>   }
> }
> {code}
> Command used to run the code
> {code:none}
> spark-submit --keytab [keytab] --principal [principal] --class [package].App --master
yarn --driver-memory 1g --executor-memory 1G --conf "spark.driver.maxResultSize=0" --conf
"spark.logConf=true" --conf "spark.executor.instances=2" --conf "spark.executor.extraJavaOptions=-XX:+PrintFlagsFinal
-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions" --conf "spark.driver.extraJavaOptions=-Xloggc:/[dir]/memory-gc.log
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions" [jar-file-path] file:///[dir-on-nas-drive]
[dir-on-hdfs] 200
> {code}
> It's a very simple piece of code, when I ran it, the memory usage of driver keeps going
up. There is no file input in our runs. Batch interval is set to 200 milliseconds; processing
time for each batch is below 150 milliseconds, while most of which are below 70 milliseconds.
> !http://i.imgur.com/uSzUui6.png!
> The right most four red triangles are full GC's which are triggered manually by using
"jcmd pid GC.run" command.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message