flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Narayanan Arunachalam (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
Date Thu, 03 May 2018 08:09:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462080#comment-16462080
] 

Narayanan Arunachalam commented on FLINK-9268:
----------------------------------------------

`Is it possible for a session window to collect duplicate data?` - The checkpoint config
is now set to AT_LEAST_ONCE. I was wondering whether the job is getting in to a state where a window
contains events restored from a checkpoint and duplicate events replayed again. After this,
if the job happens to fail again, this list can infinitely grow because of duplicates. 

> RockDB errors from WindowOperator
> ---------------------------------
>
>                 Key: FLINK-9268
>                 URL: https://issues.apache.org/jira/browse/FLINK-9268
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.4.2
>            Reporter: Narayanan Arunachalam
>            Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and uses processing
time. The job fails with the error given below after running for few hours. The only way
to recover from this error is to cancel the job and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=900000
> checkpoint.timeout=300000
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message