flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
Date Fri, 08 Jul 2016 16:40:11 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15367948#comment-15367948

ASF GitHub Bot commented on FLINK-4019:

Github user rmetzger commented on a diff in the pull request:

    --- Diff: docs/apis/streaming/connectors/kinesis.md ---
    @@ -146,6 +146,50 @@ Also note that Flink can only restart the topology if enough processing
slots ar
     Therefore, if the topology fails due to loss of a TaskManager, there must still be enough
slots available afterwards.
     Flink on YARN supports automatic restart of lost YARN containers.
    +#### Event Time for Consumed Records
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment()
    +{% endhighlight %}
    +If streaming topologies choose to use the [event time notion]({{site.baseurl}}/apis/streaming/event_time.html)
for record
    +timestamps, an *approximate arrival timestamp* will be used by default. This timestamp
is attached to records by Kinesis once they
    +were successfully received and stored by streams. Note that this timestamp is typically
referred to as a Kinesis server-side
    +timestamp, and there are no guarantees about the accuracy or order correctness (i.e.,
the timestamps may not always be
    +Users can choose to override this default with a custom timestamp, as described [here]({{
site.baseurl }}/apis/streaming/event_timestamps_watermarks.html),
    +or use one from the [predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html).
After doing so,
    +it can be passed to the consumer in the following way:
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    +    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
    +kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
    --- End diff --
    There is one minor thing here, you have to do kinesis = kinesis.assignTS() in order to
work properly.
    But I'll fix it while merging.

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface
> -------------------------------------------------------------------------------------
>                 Key: FLINK-4019
>                 URL: https://issues.apache.org/jira/browse/FLINK-4019
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.1.0
> Amazon's Record class also gives information about the timestamp of when Kinesis successfully
receives the record: http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the deserialization

This message was sent by Atlassian JIRA

View raw message