flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Konstantin Knauf <konstantin.kn...@tngtech.com>
Subject Re: Custom TimestampExtractor and FlinkKafkaConsumer082
Date Wed, 25 Nov 2015 20:15:23 GMT
Hi Aljoscha,

sure, will do. I have neither found a solution. I won't have time to put
a minimal example together before the weekend though.

Cheers,

Konstantin

On 25.11.2015 19:10, Aljoscha Krettek wrote:
> Hi Konstantin,
> I still didn’t come up with an explanation for the behavior. Could you maybe send me
example code (and example data if it is necessary to reproduce the problem.)? This would really
help me pinpoint the problem.
> 
> Cheers,
> Aljoscha
>> On 17 Nov 2015, at 21:42, Konstantin Knauf <konstantin.knauf@tngtech.com> wrote:
>>
>> Hi Aljoscha,
>>
>> Are you sure? I am running the job from my IDE at the moment.
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(1);
>>
>> I works with the old TimestampExtractor (returning Long.MIN_VALUE from
>> getCurrentWatermark() and emitting a watermark at every record)
>>
>> If I set
>>
>> StreamExecutionEnvironment.setParallelism(5);
>>
>> it does not work.
>>
>> So, if I understood you correctly, it is the opposite of what you were
>> expecting?!
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
>>> Hi,
>>> actually, the bug is more subtle. Normally, it is not a problem that the TimestampExtractor
sometimes emits a watermark that is lower than the one before. (This is the result of the
bug with Long.MIN_VALUE I mentioned before). The stream operators wait for watermarks from
all upstream operators and only advance the watermark monotonically in lockstep with them.
This way, the watermark cannot decrease at an operator.
>>>
>>> In your case, you have a topology with parallelism 1, I assume. In that case
the operators are chained. (There is no separate operators but basically only one operator
and element transmission happens in function calls). In this setting the watermarks are directly
forwarded to operators without going through the logic I mentioned above.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <konstantin.knauf@tngtech.com>
wrote:
>>>>
>>>> Hi Aljoscha,
>>>>
>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp and only
>>>> emit with getCurrentWatermark [1] as you suggested. So basically I do
>>>> the opposite than before (only watermarks per events vs only watermarks
>>>> per autowatermark). And now it works :). The question remains, why it
>>>> did not work before. As far as I see, it is an issue with the first
>>>> TimestmapExtractor itself?!
>>>>
>>>> Does getCurrentWatermark(..) somehow "overpower" the extracted watermarks?
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> [1]
>>>>
>>>>   final private long maxDelay;
>>>>   private long lastTimestamp = Long.MIN_VALUE;
>>>>
>>>>   public PojoTimestampExtractor(long maxDelay) {
>>>>       this.maxDelay = maxDelay;
>>>>   }
>>>>
>>>>   @Override
>>>>   public long extractTimestamp(Pojo pojo, long l) {
>>>>       lastTimestamp = pojo.getTime();
>>>>       return pojo.getTime();
>>>>   }
>>>>
>>>>   @Override
>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>       return Long.MIN_VALUE;
>>>>   }
>>>>
>>>>   @Override
>>>>   public long getCurrentWatermark() {
>>>>       return lastTimestamp - maxDelay;
>>>>   }
>>>>
>>>>
>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> yes, at your data-rate emitting a watermark for every element should
not be a problem. It could become a problem with higher data-rates since the system can get
overwhelmed if every element also generates a watermark. In that case I would suggest storing
the lastest element-timestamp in an internal field and only emitting in getCurrentWatermark(),
since then, then the watermark interval can be tunes using the auto-watermark interval setting.
>>>>>
>>>>> But that should not be the cause of the problem that you currently have.
Would you maybe be willing to send me some (mock) example data and the code so that I can
reproduce the problem and have a look at it? to aljoscha at apache.org.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.knauf@tngtech.com>
wrote:
>>>>>>
>>>>>> Hi Aljoscha,
>>>>>>
>>>>>> ok, now I at least understand, why it works with fromElements(...).
For
>>>>>> the rest I am not so sure.
>>>>>>
>>>>>>> What this means in your case is that the watermark can only advance
if
>>>>>> a new element arrives, because only then is the watermark updated.
>>>>>>
>>>>>> But new elements arrive all the time, about 50/s, or do you mean
>>>>>> something else?
>>>>>>
>>>>>> getCurrentWatermark returning Long.MIN_VALUE still seems to be an
ok
>>>>>> choice, if i understand the semantics correctly. It just affects
>>>>>> watermarking in the absence of events, right?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>>
>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>>>>>>> Hi,
>>>>>>> it could be what Gyula mentioned. Let me first go a bit into
how the TimestampExtractor works internally.
>>>>>>>
>>>>>>> First, the timestamp extractor internally keeps the value of
the last emitted watermark. Then, the semantics of the TimestampExtractor are as follows :
>>>>>>> - the result of extractTimestamp is taken and it replaces the
internal timestamp of the element
>>>>>>> - if the result of extractWatermark is larger than the last watermark
the new value is emitted as a watermark and the value is stored
>>>>>>> - getCurrentWatermark is called on the specified auto-watermark
interval, if the returned value is larger than the last watermark it is emitted and stored
as last watermark
>>>>>>>
>>>>>>> What this means in your case is that the watermark can only advance
if a new element arrives, because only then is the watermark updated.
>>>>>>>
>>>>>>> The reason why you see results if you use fromElements is that
the window-operator also emits all the windows that it currently has buffered if the program
closes. This happens in the case of fromElements because only a finite number of elements
is emitted, after which the source closes, thereby finishing the whole program.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.fora@gmail.com>
wrote:
>>>>>>>>
>>>>>>>> Could this part of the extractor be the problem Aljoscha?
>>>>>>>>
>>>>>>>> @Override
>>>>>>>>  public long getCurrentWatermark() {
>>>>>>>>      return Long.MIN_VALUE;
>>>>>>>>  }
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> Konstantin Knauf <konstantin.knauf@tngtech.com> ezt
írta (időpont: 2015. nov. 16., H, 10:39):
>>>>>>>> Hi Aljoscha,
>>>>>>>>
>>>>>>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>>>>>>>
>>>>>>>> The timestamps look good to me. Here an example.
>>>>>>>>
>>>>>>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>>>>>>>
>>>>>>>> The order now is
>>>>>>>>
>>>>>>>> stream
>>>>>>>> .map(dummyMapper)
>>>>>>>> .assignTimestamps(...)
>>>>>>>> .timeWindow(...)
>>>>>>>>
>>>>>>>> Is there a way to print out the assigned timestamps after
>>>>>>>> stream.assignTimestamps(...)?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Konstantin
>>>>>>>>
>>>>>>>>
>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>>>>>>> Hi,
>>>>>>>>> are you also using the timestamp extractor when you are
using env.fromCollection().
>>>>>>>>>
>>>>>>>>> Could you maybe insert a dummy mapper after the Kafka
source that just prints the element and forwards it? To see if the elements come with a good
timestamp from Kafka.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.knauf@tngtech.com>
wrote:
>>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>
>>>>>>>>>> I have the following issue with Flink (0.10) and
Kafka.
>>>>>>>>>>
>>>>>>>>>> I am using a very simple TimestampExtractor like
[1], which just
>>>>>>>>>> extracts a millis timestamp from a POJO. In my streaming
job, I read in
>>>>>>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082
like this:
>>>>>>>>>>
>>>>>>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>>>>>>> (parameterTool.getRequired("topic"),
>>>>>>>>>>             new AvroPojoDeserializationSchema(),
>>>>>>>>>> parameterTool.getProperties()))
>>>>>>>>>>
>>>>>>>>>> I have timestampEnabled() and the TimeCharacteristics
are EventTime,
>>>>>>>>>> AutoWatermarkIntervall is 500.
>>>>>>>>>>
>>>>>>>>>> The problem is, when I do something like:
>>>>>>>>>>
>>>>>>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>>>>>>> .sum(..)
>>>>>>>>>> .print()
>>>>>>>>>>
>>>>>>>>>> env.execute();
>>>>>>>>>>
>>>>>>>>>> the windows never get triggered.
>>>>>>>>>>
>>>>>>>>>> If I use ProcessingTime it works.
>>>>>>>>>> If I use env.fromCollection(...) instead of the KafkaSource
it works
>>>>>>>>>> with EventTime, too.
>>>>>>>>>>
>>>>>>>>>> Any ideas what I could be doing wrong are highly
appreciated.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>>
>>>>>>>>>> Konstantin
>>>>>>>>>>
>>>>>>>>>> [1]:
>>>>>>>>>>
>>>>>>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo>
{
>>>>>>>>>>
>>>>>>>>>> final private long maxDelay;
>>>>>>>>>>
>>>>>>>>>> public  PojoTimestampExtractor(long maxDelay) {
>>>>>>>>>>     this.maxDelay = maxDelay;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long extractTimestamp(Pojo fightEvent, long
l) {
>>>>>>>>>>     return pojo.getTime();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long extractWatermark(Pojo pojo, long l) {
>>>>>>>>>>     return pojo.getTime() - maxDelay;
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public long getCurrentWatermark() {
>>>>>>>>>>     return Long.MIN_VALUE;
>>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
Dahlke
>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> -- 
>>>>>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>
>>>>>
>>>>
>>>> -- 
>>>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>>
>>
>> -- 
>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Mime
View raw message