flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Stefan Richter (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9268) RockDB errors from WindowOperator
Date Sat, 28 Apr 2018 10:01:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457505#comment-16457505
] 

Stefan Richter commented on FLINK-9268:
---------------------------------------

This is a known issue with RocksDB, see [https://github.com/facebook/rocksdb/issues/2383.]

Summary is: RocksDB the whole list state for one key in a window must fit into a byte array
when returned over JNI. That means the maximum number of bytes per value of a key in a window
is Integer.MAX_VALUE, i.e. 2GB. I would guess that your job somehow collects more than that
for a single key.

> RockDB errors from WindowOperator
> ---------------------------------
>
>                 Key: FLINK-9268
>                 URL: https://issues.apache.org/jira/browse/FLINK-9268
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing
>            Reporter: Narayanan Arunachalam
>            Priority: Major
>
> The job has no sinks, one Kafka source, does a windowing based on session and uses processing
time. The job fails with the error given below after running for few hours. The only way
to recover from this error is to cancel the job and start a new one.
> Using S3 backend for externalized checkpoints.
> A representative job DAG:
> val streams = sEnv
>  .addSource(makeKafkaSource(config))
>  .map(makeEvent)
>  .keyBy(_.get(EVENT_GROUP_ID))
>  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(60)))
>  .trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
>  .apply(makeEventsList)
> .addSink(makeNoOpSink)
> A representative config:
> state.backend=rocksDB
> checkpoint.enabled=true
> external.checkpoint.enabled=true
> checkpoint.mode=AT_LEAST_ONCE
> checkpoint.interval=900000
> checkpoint.timeout=300000
> Error:
> TimerException\{java.lang.NegativeArraySizeException}
>  at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NegativeArraySizeException
>  at org.rocksdb.RocksDB.get(Native Method)
>  at org.rocksdb.RocksDB.get(RocksDB.java:810)
>  at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:86)
>  at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:49)
>  at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:496)
>  at org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:255)
>  at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message