flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records
Date Wed, 29 Jun 2016 08:35:45 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354801#comment-15354801

ASF GitHub Bot commented on FLINK-4080:

Github user rmetzger commented on a diff in the pull request:

    --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
    @@ -0,0 +1,87 @@
    +package org.apache.flink.streaming.connectors.kinesis.model;
    --- End diff --
    This file is missing the license header

> Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records
> -------------------------------------------------------------------------------------------
>                 Key: FLINK-4080
>                 URL: https://issues.apache.org/jira/browse/FLINK-4080
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.1.0
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after several tries.
> Kinesis records of the same aggregated batch will have the same sequence number, and
different sub-sequence numbers (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
The current code of the consumer is committing state every time it finishes processing a record,
even de-aggregated ones. This is a bug since this will incorrectly mark all remaining records
of the de-aggregated batch as processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records (either non- or
de-aggregated) instead of the basic `Record` class. This gives access to whether or not the
record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to indicate that the
last seen sequence number of a shard may be a de-aggregated shard, i.e., {"shard0" -> "5:8",
"shard1" -> "2"} meaning the 8th sub-record of the 5th record was last seen for shard 0.
On restore, we start again from record 5 for shard 0 and skip the first 7 sub-records; however,
for shard 1 we start from record 3 since record 2 is non-aggregated and already fully processed.

This message was sent by Atlassian JIRA

View raw message