flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Doubt Regarding producing to kafka using flink
Date Mon, 03 Apr 2017 13:02:04 GMT
Hi Archit,

The problem is that you need to assign the returned `DataStream` from `stream.assignTimestampsAndWatermarks`
to a separate variable, and use that when instantiating the Kafka 0.10 sink.
The `assignTimestampsAndWatermarks` method returns a new `DataStream` instance with records
that have assigned timestamps. Calling it does not affect the original `DataStream` instance.

Cheers,
Gordon

On April 3, 2017 at 5:15:03 PM, Archit Mittal (marchit51@gmail.com) wrote:

Hi Gordon
This is the function snippet i am using but i am getting invalid timestamp         
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "word");
        properties.setProperty("auto.offset.reset", "earliest");


        DataStream < WordCount > stream = env.fromElements(wordCount);
        stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<WordCount>()
{
            @Override
            public long extractAscendingTimestamp(WordCount element) {
                return DateTime.now().getMillis();
            }
        });


        FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream,
KAFKA_TOPIC, new WordCountSchema(), properties);
        config.setWriteTimestampToKafka(true);

        env.execute("job");

On Mon, Apr 3, 2017 at 8:20 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Archit!

You’ll need to assign timestamps to the records in your stream before producing them to
Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further questions if you bump
into any!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html

On April 2, 2017 at 6:38:13 PM, Archit Mittal (marchit51@gmail.com) wrote:

Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249)
~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345) 

how do i put timestamp in my object before producing ?

Thanks
Archit


Mime
View raw message