flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka
Date Sat, 20 Aug 2016 04:02:23 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429202#comment-15429202

ASF GitHub Bot commented on FLINK-4035:

Github user tzulitai commented on a diff in the pull request:

    --- Diff: docs/apis/streaming/connectors/kafka.md ---
    @@ -291,3 +301,35 @@ higher value.
     There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once
     into a Kafka topic.
    +### Using Kafka timestamps and Flink event time in Kafka 0.10
    +Since Apache Kafka 0.10., Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message),
    +the time the event has occurred (see ["event time" in Apache Flink](../event_time.html))
or the time when the message
    +has been written to the Kafka broker.
    +The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if the time
characteristic in Flink is 
    +set to `TimeCharacteristic.EventTime` (`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).
    +The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms
as described above in 
    +"Kafka Consumers and Timestamp Extraction/Watermark Emission"  using the `assignTimestampsAndWatermarks`
method are applicable.
    +There is no need to define a timestamp extractor when using the timestamps from Kafka.
The `previousElementTimestamp` argument of 
    +the `extractTimestamp()` method contains the timestamp carried by the Kafka message.
    +A timestamp extractor for a Kafka consumer would look like this:
    +{% highlight java %}
    +public long extractTimestamp(Long element, long previousElementTimestamp) {
    +    return previousElementTimestamp;
    +{% endhighlight %}
    +The `FlinkKafkaProducer010` only emits the record timestamp, if `setWriteTimestampToKafka(true)`
is set.
    +{% highlight java %}
    +FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafka(streamWithTimestamps,
topic, new SimpleStringSchema(), standardProps);
    +{% endhighlight %}
    --- End diff --
    I find the usage pattern of this a bit unfamiliar. I've explained this in inline comments
of the `FlinkKafkaProducer010` class.

> Bump Kafka producer in Kafka sink to Kafka
> ---------------------------------------------------
>                 Key: FLINK-4035
>                 URL: https://issues.apache.org/jira/browse/FLINK-4035
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.3
>            Reporter: Elias Levy
>            Assignee: Robert Metzger
>            Priority: Minor
> Kafka introduced protocol changes related to the producer.  Published messages
now include timestamps and compressed messages now include relative offsets.  As it is now,
brokers must decompress publisher compressed messages, assign offset to them, and recompress
them, which is wasteful and makes it less likely that compression will be used at all.

This message was sent by Atlassian JIRA

View raw message