flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Windows getting created only on first execution
Date Wed, 11 Oct 2017 12:18:25 GMT
Hi,

When you are restoring from a savepoint (or checkpoint) the offsets in Kafka are complete
ignored. Flink is checkpointing the offset at the time the checkpoint/savepoint is taken and
that will be used as the read offset when restoring.

Best,
Aljoscha

> On 11. Oct 2017, at 12:58, Rahul Raj <rahulrajmsrit@gmail.com> wrote:
> 
> Changing the group id didn't work for me, instead using setStartfromEarliest() on kafka
consumer worked for me. But it created one confusion, that is in case of failure if I start
from a particular checkpoint or savepoint will the application start reading the message from
a particular offset where checkpoint/savepoint was created or it will start reading from the
first record in Kafka partition?
> 
> Rahul Raj 
> 
> On 11 October 2017 at 15:44, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Hi,
> 
> I think the problem is that your Kafka consumer has the same group-id across those two
runs. This means that it will pick up the last "read position" of the previous run, and thus
not read anything. If you change the group-id for the second run you should be able to read
your data again.
> 
> Best,
> Aljoscha
> 
> 
>> On 11. Oct 2017, at 06:19, Rahul Raj <rahulrajmsrit@gmail.com <mailto:rahulrajmsrit@gmail.com>>
wrote:
>> 
>> Hi ,
>> 
>> I have written a program which reads data from Kafka, parses the json and does some
reduce operation. The problem I am facing is, the program executes perfectly for the first
time on a day. But when I kill the program and execute it again, an empty file is created.
Even after compiling again and running, an empty file is created.
>> 
>> var kafkaConsumer = new FlinkKafkaConsumer08(
>> 
>>       params.getRequired("input-topic"),
>> 
>>       new SimpleStringSchema,
>> 
>>       params.getProperties)
>> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> 
>> 
>>     var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
>> 
>> 
>> 
>>     var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String]
{
>> 
>>       var ts = Long.MinValue
>> 
>> 
>> 
>>       override def extractTimestamp(element: String, previousElementTimestamp: Long):
Long = {
>> 
>>         var timestamp = json_decode(element).toLong
>> 
>>         ts = Math.max(timestamp,previousElementTimestamp)
>> 
>>         timestamp
>> 
>>       }
>> 
>> 
>> 
>>       override def getCurrentWatermark(): Watermark = {
>> 
>>         new Watermark(ts)
>> 
>>       }
>> 
>>     })
>> 
>>     var output = mts
>> 
>>       .keyBy(t=>json_decode(t))
>> 
>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>> 
>>       .allowedLateness(Time.seconds(5))
>> 
>>       .reduce((v1,v2)=>v1+"----"+v2)
>> 
>> 
>> 
>> output.writeAsText(path).setParallelism(1)
>> 
>> 
>> 
>> I am using FileSystem as statebackend. I am assuming this problem is related to memory
cleaning, but I don't understand what's happening.
>> 
>> Any help?
>> 
>> 
>> 
>> Rahul Raj
>> 
>> 
>> 
> 
> 


Mime
View raw message