flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4341) Kinesis connector does not emit maximum watermark properly
Date Wed, 24 Aug 2016 17:12:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15435309#comment-15435309

ASF GitHub Bot commented on FLINK-4341:

GitHub user tzulitai opened a pull request:


    [FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding

    This is a short-term fix, until the min-watermark service for the JobManager described
in the JIRA discussion is available.
    The way this fix works is that we let idle subtasks that initially don't get assigned
shards emit a `Long.MAX_VALUE` watermark. Also, we _only fail hard if an idle subtask_ is
assigned new shards when resharding happens, to avoid messing up the watermarks. So, if all
subtasks are not initially idle on startup (i.e., when total number of shards > consumer
parallelism), the Kinesis consumer can still transparently handle resharding like before without
    I've tested exactly-once with our manual tests (with and w/o resharding) and the fix works
nicely, still retaining exactly-once guarantee despite non-transparency. However, I'm a bit
unsure on how to test if the unbounded state with window operators is also fixed with this
change, so we're still yet to clarify this.
    R: @rmetzger and @aljoscha for review. Thanks in advance!

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-4341

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2414
commit bc8e50d99be745300f7418c58e9d30abc5469ba3
Author: Gordon Tai <tzulitai@gmail.com>
Date:   2016-08-24T08:38:06Z

    [FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail on resharding
    This no longer allows the Kinesis consumer to transparently handle resharding.
    This is a short-term workaround until we have a min-watermark notification service available
in the JobManager.


> Kinesis connector does not emit maximum watermark properly
> ----------------------------------------------------------
>                 Key: FLINK-4341
>                 URL: https://issues.apache.org/jira/browse/FLINK-4341
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.1.0, 1.1.1
>            Reporter: Scott Kidder
>            Assignee: Robert Metzger
>            Priority: Blocker
>             Fix For: 1.2.0, 1.1.2
> **Prevously reported as "Checkpoint state size grows unbounded when task parallelism
not uniform"**
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I was previously
using a 1.1.0 snapshot (commit 18995c8) which performed as expected.  This issue was introduced
somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read from a Kinesis
stream with 2 shards. I've got 2 task managers with 2 slots each, providing a total of 4 slots.
 When running the application with a parallelism of 4, the Kinesis consumer uses 2 slots (one
per Kinesis shard) and 4 slots for subsequent tasks that process the Kinesis stream data.
I use an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint states were
growing unbounded when running with a parallelism of 4, checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 2    20.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving
checkpoint barrier at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>   at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum permitted
memory-backed state. Size=12105407 , maxSize=5242880 . Consider using a different state backend,
like the File System State backend.
>   at org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>   at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>   at org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>   at org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>   at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>   at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>   ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask   
       - Restoring checkpointed state to task Fold: property_id, player -> 10-minute Sliding-Window
Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter            - Transient association
error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized
payload sent to Actor[akka.tcp://flink@]: max allowed
size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint
was 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I suspect there
was a regression introduced relating to assumptions about the number of sub-tasks associated
with a job stage (e.g. assuming 4 instead of a value ranging from 1-4). This is currently
preventing me from using all available Task Manager slots.

This message was sent by Atlassian JIRA

View raw message