spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andrew Milkowski (JIRA)" <>
Subject [jira] [Commented] (SPARK-19364) Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are enabled and an exception is thrown
Date Thu, 09 Mar 2017 20:17:38 GMT


Andrew Milkowski commented on SPARK-19364:

thanks @Takeshi Yamamuro , will try to see if I can make this error consistent (we see it
in prod non stop and it is consistent) I will see if I can throw the exception from in the
kinesis receiver (java lib) and see stream blocks grow in spark, will provide line change
to re-produce problem.. it is tired to kinesis java lib faulting on checkpoint throwing exception
and spark persisting stream blocks and never releasing em from memory till eventual OME

> Stream Blocks in Storage Persists Forever when Kinesis Checkpoints are enabled and an
exception is thrown 
> ----------------------------------------------------------------------------------------------------------
>                 Key: SPARK-19364
>                 URL:
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.0.2
>         Environment: ubuntu unix
> spark 2.0.2
> application is java
>            Reporter: Andrew Milkowski
>            Priority: Blocker
> -- update --- we found that below situation occurs when we encounter
> " Can't update
checkpoint - instance doesn't hold the lease for this shard"
> we use s3 directory (and dynamodb) to store checkpoints, but if such occurs blocks should
not get stuck but continue to be evicted gracefully from memory, obviously kinesis library
race condition is a problem onto itself...
> -- exception leading to a block not being freed up --
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See for an explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException:  Caught shutdown exception,
skipping checkpoint.
> Can't update
checkpoint - instance doesn't hold the lease for this shard
> 	at
> 	at
> 	at
> 	at
> 	at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81)
> 	at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
> 	at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144)
> 	at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81)
> 	at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75)
> 	at scala.Option.foreach(Option.scala:257)
> 	at org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75)
> 	at$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103)
> 	at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117)
> 	at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94)
> 	at$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106)
> 	at org.apache.spark.streaming.util.RecurringTimer$$anon$
> running standard kinesis stream ingestion with a java spark app and creating dstream
after running for some time some block streams seem to persist forever and never cleaned up
and this eventually leads to memory depletion on workers
> we even tried cleaning RDD's with the following:
> cleaner = ssc.sparkContext().sc().cleaner().get();
>         filtered.foreachRDD(new VoidFunction<JavaRDD<String>>() {
>             @Override
>             public void call(JavaRDD<String> rdd) throws Exception {
>                cleaner.doCleanupRDD(, true);
>             }
>         });
> despite above blocks do persis still, this can be seen in spark admin UI
> for instance
> input-0-1485362233945	1	ip-<>:34245	Memory Serialized	1442.5 KB
> above block stays and is never cleaned up

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message