flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Custom TimestampExtractor and FlinkKafkaConsumer082
Date Fri, 04 Dec 2015 21:21:34 GMT
I think we need to find a solution for this problem soon.
Another user is most likely affected:
http://stackoverflow.com/q/34090808/568695

I've filed a JIRA for the problem:
https://issues.apache.org/jira/browse/FLINK-3121


On Mon, Nov 30, 2015 at 5:58 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Maybe. In the Kafka case we just need to ensure that parallel instances of
> the source that know that they don’t have any partitions assigned to them
> emit Long.MAX_VALUE as a watermark.
>
> > On 30 Nov 2015, at 17:50, Gyula Fóra <gyula.fora@gmail.com> wrote:
> >
> > Hi,
> >
> > I think what we will need at some point for this are approximate
> whatermarks which correlate event and ingest time.
> >
> > I think they have similar concepts in Millwheel/Dataflow.
> >
> > Cheers,
> > Gyula
> > On Mon, Nov 30, 2015 at 5:29 PM Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> > Hi,
> > as an addition. I don’t have a solution yet, for the general problem of
> what happens when a parallel instance of a source never receives elements.
> This watermark business is very tricky...
> >
> > Cheers,
> > Aljoscha
> > > On 30 Nov 2015, at 17:20, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
> > >
> > > Hi Konstantin,
> > > I finally nailed down the problem. :-)
> > >
> > > The basis of the problem is the fact that there is a mismatch in the
> parallelism of the Flink Kafka Consumer and the number of partitions in the
> Kafka Stream. I would assume that in your case the Kafka Stream has 1
> partition. This means, that only one of the parallel instances of the Flink
> Kafka Consumer ever receives element, which in turn means that only one of
> the parallel instances of the timestamp extractor ever receives elements.
> This means that no watermarks get emitted for the other parallel instances
> which in turn means that the watermark does not advance downstream because
> the watermark at an operator is the minimum over all upstream watermarks.
> This explains why ExampleTimestampExtractor1 only works in the case with
> parallelism=1.
> > >
> > > The reason why ExampleTimestampExtractor2 works in all parallelism
> settings is not very obvious. The secret is in this method:
> > >
> > > @Override
> > > public long getCurrentWatermark() {
> > >   return lastTimestamp - maxDelay;
> > > }
> > >
> > > In the parallel instances that never receive any element lastTimestamp
> is set to Long.MIN_VALUE. So “lastTimestamp - maxDelay” is (Long.MAX_VALUE
> - maxDelay (+1)). Now, because the watermark at an operator is always the
> minimum over all watermarks from upstream operators the watermark at the
> window operator always tracks the watermark of the parallel instance that
> receives elements.
> > >
> > > I hope this helps, but please let me know if I should provide more
> explanation. This is a very tricky topic.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > >> On 29 Nov 2015, at 21:18, Konstantin Knauf <
> konstantin.knauf@tngtech.com> wrote:
> > >>
> > >> Hi Aljoscha,
> > >>
> > >> I have put together a gist [1] with two classes, a short processing
> > >> pipeline, which shows the behavior and a data generator to write
> records
> > >> into Kafka. I hope I remembered everything we discussed correctly.
> > >>
> > >> So basically in the example it works with "TimestampExtractor1" only
> for
> > >> parallelism 1, with "TimestampExtractor2" it works regardless of the
> > >> parallelism. Run from the IDE.
> > >>
> > >> Let me know if you need anything else.
> > >>
> > >> Cheers,
> > >>
> > >> Konstantin
> > >>
> > >> [1] https://gist.github.com/knaufk/d57b5c3c7db576f3350d
> > >>
> > >> On 25.11.2015 21:15, Konstantin Knauf wrote:
> > >>> Hi Aljoscha,
> > >>>
> > >>> sure, will do. I have neither found a solution. I won't have time to
> put
> > >>> a minimal example together before the weekend though.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Konstantin
> > >>>
> > >>> On 25.11.2015 19:10, Aljoscha Krettek wrote:
> > >>>> Hi Konstantin,
> > >>>> I still didn’t come up with an explanation for the behavior.
Could
> you maybe send me example code (and example data if it is necessary to
> reproduce the problem.)? This would really help me pinpoint the problem.
> > >>>>
> > >>>> Cheers,
> > >>>> Aljoscha
> > >>>>> On 17 Nov 2015, at 21:42, Konstantin Knauf <
> konstantin.knauf@tngtech.com> wrote:
> > >>>>>
> > >>>>> Hi Aljoscha,
> > >>>>>
> > >>>>> Are you sure? I am running the job from my IDE at the moment.
> > >>>>>
> > >>>>> If I set
> > >>>>>
> > >>>>> StreamExecutionEnvironment.setParallelism(1);
> > >>>>>
> > >>>>> I works with the old TimestampExtractor (returning Long.MIN_VALUE
> from
> > >>>>> getCurrentWatermark() and emitting a watermark at every record)
> > >>>>>
> > >>>>> If I set
> > >>>>>
> > >>>>> StreamExecutionEnvironment.setParallelism(5);
> > >>>>>
> > >>>>> it does not work.
> > >>>>>
> > >>>>> So, if I understood you correctly, it is the opposite of what
you
> were
> > >>>>> expecting?!
> > >>>>>
> > >>>>> Cheers,
> > >>>>>
> > >>>>> Konstantin
> > >>>>>
> > >>>>>
> > >>>>> On 17.11.2015 11:32, Aljoscha Krettek wrote:
> > >>>>>> Hi,
> > >>>>>> actually, the bug is more subtle. Normally, it is not a
problem
> that the TimestampExtractor sometimes emits a watermark that is lower than
> the one before. (This is the result of the bug with Long.MIN_VALUE I
> mentioned before). The stream operators wait for watermarks from all
> upstream operators and only advance the watermark monotonically in lockstep
> with them. This way, the watermark cannot decrease at an operator.
> > >>>>>>
> > >>>>>> In your case, you have a topology with parallelism 1, I
assume.
> In that case the operators are chained. (There is no separate operators but
> basically only one operator and element transmission happens in function
> calls). In this setting the watermarks are directly forwarded to operators
> without going through the logic I mentioned above.
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Aljoscha
> > >>>>>>> On 16 Nov 2015, at 18:13, Konstantin Knauf <
> konstantin.knauf@tngtech.com> wrote:
> > >>>>>>>
> > >>>>>>> Hi Aljoscha,
> > >>>>>>>
> > >>>>>>> I changed the Timestamp Extraktor to save the lastSeenTimestamp
> and only
> > >>>>>>> emit with getCurrentWatermark [1] as you suggested.
So basically
> I do
> > >>>>>>> the opposite than before (only watermarks per events
vs only
> watermarks
> > >>>>>>> per autowatermark). And now it works :). The question
remains,
> why it
> > >>>>>>> did not work before. As far as I see, it is an issue
with the
> first
> > >>>>>>> TimestmapExtractor itself?!
> > >>>>>>>
> > >>>>>>> Does getCurrentWatermark(..) somehow "overpower" the
extracted
> watermarks?
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>>
> > >>>>>>> Konstantin
> > >>>>>>>
> > >>>>>>> [1]
> > >>>>>>>
> > >>>>>>> final private long maxDelay;
> > >>>>>>> private long lastTimestamp = Long.MIN_VALUE;
> > >>>>>>>
> > >>>>>>> public PojoTimestampExtractor(long maxDelay) {
> > >>>>>>>     this.maxDelay = maxDelay;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> @Override
> > >>>>>>> public long extractTimestamp(Pojo pojo, long l) {
> > >>>>>>>     lastTimestamp = pojo.getTime();
> > >>>>>>>     return pojo.getTime();
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> @Override
> > >>>>>>> public long extractWatermark(Pojo pojo, long l) {
> > >>>>>>>     return Long.MIN_VALUE;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> @Override
> > >>>>>>> public long getCurrentWatermark() {
> > >>>>>>>     return lastTimestamp - maxDelay;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 16.11.2015 13:37, Aljoscha Krettek wrote:
> > >>>>>>>> Hi,
> > >>>>>>>> yes, at your data-rate emitting a watermark for
every element
> should not be a problem. It could become a problem with higher data-rates
> since the system can get overwhelmed if every element also generates a
> watermark. In that case I would suggest storing the lastest
> element-timestamp in an internal field and only emitting in
> getCurrentWatermark(), since then, then the watermark interval can be tunes
> using the auto-watermark interval setting.
> > >>>>>>>>
> > >>>>>>>> But that should not be the cause of the problem
that you
> currently have. Would you maybe be willing to send me some (mock) example
> data and the code so that I can reproduce the problem and have a look at
> it? to aljoscha at apache.org.
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Aljoscha
> > >>>>>>>>> On 16 Nov 2015, at 13:05, Konstantin Knauf
<
> konstantin.knauf@tngtech.com> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hi Aljoscha,
> > >>>>>>>>>
> > >>>>>>>>> ok, now I at least understand, why it works
with
> fromElements(...). For
> > >>>>>>>>> the rest I am not so sure.
> > >>>>>>>>>
> > >>>>>>>>>> What this means in your case is that the
watermark can only
> advance if
> > >>>>>>>>> a new element arrives, because only then is
the watermark
> updated.
> > >>>>>>>>>
> > >>>>>>>>> But new elements arrive all the time, about
50/s, or do you
> mean
> > >>>>>>>>> something else?
> > >>>>>>>>>
> > >>>>>>>>> getCurrentWatermark returning Long.MIN_VALUE
still seems to be
> an ok
> > >>>>>>>>> choice, if i understand the semantics correctly.
It just
> affects
> > >>>>>>>>> watermarking in the absence of events, right?
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>>
> > >>>>>>>>> Konstantin
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 16.11.2015 12:31, Aljoscha Krettek wrote:
> > >>>>>>>>>> Hi,
> > >>>>>>>>>> it could be what Gyula mentioned. Let me
first go a bit into
> how the TimestampExtractor works internally.
> > >>>>>>>>>>
> > >>>>>>>>>> First, the timestamp extractor internally
keeps the value of
> the last emitted watermark. Then, the semantics of the TimestampExtractor
> are as follows :
> > >>>>>>>>>> - the result of extractTimestamp is taken
and it replaces the
> internal timestamp of the element
> > >>>>>>>>>> - if the result of extractWatermark is
larger than the last
> watermark the new value is emitted as a watermark and the value is stored
> > >>>>>>>>>> - getCurrentWatermark is called on the
specified
> auto-watermark interval, if the returned value is larger than the last
> watermark it is emitted and stored as last watermark
> > >>>>>>>>>>
> > >>>>>>>>>> What this means in your case is that the
watermark can only
> advance if a new element arrives, because only then is the watermark
> updated.
> > >>>>>>>>>>
> > >>>>>>>>>> The reason why you see results if you use
fromElements is
> that the window-operator also emits all the windows that it currently has
> buffered if the program closes. This happens in the case of fromElements
> because only a finite number of elements is emitted, after which the source
> closes, thereby finishing the whole program.
> > >>>>>>>>>>
> > >>>>>>>>>> Cheers,
> > >>>>>>>>>> Aljoscha
> > >>>>>>>>>>> On 16 Nov 2015, at 10:42, Gyula Fóra
<gyula.fora@gmail.com>
> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Could this part of the extractor be
the problem Aljoscha?
> > >>>>>>>>>>>
> > >>>>>>>>>>> @Override
> > >>>>>>>>>>> public long getCurrentWatermark() {
> > >>>>>>>>>>>    return Long.MIN_VALUE;
> > >>>>>>>>>>> }
> > >>>>>>>>>>>
> > >>>>>>>>>>> Gyula
> > >>>>>>>>>>>
> > >>>>>>>>>>> Konstantin Knauf <konstantin.knauf@tngtech.com>
ezt írta
> (időpont: 2015. nov. 16., H, 10:39):
> > >>>>>>>>>>> Hi Aljoscha,
> > >>>>>>>>>>>
> > >>>>>>>>>>> thanks for your answer. Yes I am using
the same
> TimestampExtractor-Class.
> > >>>>>>>>>>>
> > >>>>>>>>>>> The timestamps look good to me. Here
an example.
> > >>>>>>>>>>>
> > >>>>>>>>>>> {"time": 1447666537260, ...} And parsed:
> 2015-11-16T10:35:37.260+01:00
> > >>>>>>>>>>>
> > >>>>>>>>>>> The order now is
> > >>>>>>>>>>>
> > >>>>>>>>>>> stream
> > >>>>>>>>>>> .map(dummyMapper)
> > >>>>>>>>>>> .assignTimestamps(...)
> > >>>>>>>>>>> .timeWindow(...)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Is there a way to print out the assigned
timestamps after
> > >>>>>>>>>>> stream.assignTimestamps(...)?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Konstantin
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 16.11.2015 10:31, Aljoscha Krettek
wrote:
> > >>>>>>>>>>>> Hi,
> > >>>>>>>>>>>> are you also using the timestamp
extractor when you are
> using env.fromCollection().
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Could you maybe insert a dummy
mapper after the Kafka
> source that just prints the element and forwards it? To see if the elements
> come with a good timestamp from Kafka.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>> Aljoscha
> > >>>>>>>>>>>>> On 15 Nov 2015, at 22:55, Konstantin
Knauf <
> konstantin.knauf@tngtech.com> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I have the following issue
with Flink (0.10) and Kafka.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I am using a very simple TimestampExtractor
like [1],
> which just
> > >>>>>>>>>>>>> extracts a millis timestamp
from a POJO. In my streaming
> job, I read in
> > >>>>>>>>>>>>> these POJOs from Kafka using
the FlinkKafkaConsumer082
> like this:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> stream = env.addSource(new
FlinkKafkaConsumer082<
> > >>>>>>>>>>>>> (parameterTool.getRequired("topic"),
> > >>>>>>>>>>>>>           new AvroPojoDeserializationSchema(),
> > >>>>>>>>>>>>> parameterTool.getProperties()))
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I have timestampEnabled() and
the TimeCharacteristics are
> EventTime,
> > >>>>>>>>>>>>> AutoWatermarkIntervall is 500.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The problem is, when I do something
like:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> stream.assignTimestamps(new
PojoTimestampExtractor(6000))
> > >>>>>>>>>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
> > >>>>>>>>>>>>> .sum(..)
> > >>>>>>>>>>>>> .print()
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> env.execute();
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> the windows never get triggered.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> If I use ProcessingTime it
works.
> > >>>>>>>>>>>>> If I use env.fromCollection(...)
instead of the
> KafkaSource it works
> > >>>>>>>>>>>>> with EventTime, too.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Any ideas what I could be doing
wrong are highly
> appreciated.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Konstantin
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> [1]:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> public class PojoTimestampExtractor
implements
> TimestampExtractor<Pojo> {
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> final private long maxDelay;
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> public  PojoTimestampExtractor(long
maxDelay) {
> > >>>>>>>>>>>>>   this.maxDelay = maxDelay;
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> @Override
> > >>>>>>>>>>>>> public long extractTimestamp(Pojo
fightEvent, long l) {
> > >>>>>>>>>>>>>   return pojo.getTime();
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> @Override
> > >>>>>>>>>>>>> public long extractWatermark(Pojo
pojo, long l) {
> > >>>>>>>>>>>>>   return pojo.getTime() - maxDelay;
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> @Override
> > >>>>>>>>>>>>> public long getCurrentWatermark()
{
> > >>>>>>>>>>>>>   return Long.MIN_VALUE;
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> Konstantin Knauf * konstantin.knauf@tngtech.com
*
> +49-174-3413182
> > >>>>>>>>>>> TNG Technology Consulting GmbH, Betastr.
13a, 85774
> Unterföhring
> > >>>>>>>>>>> Geschäftsführer: Henrik Klagges,
Christoph Stock, Dr. Robert
> Dahlke
> > >>>>>>>>>>> Sitz: Unterföhring * Amtsgericht München
* HRB 135082
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> Konstantin Knauf * konstantin.knauf@tngtech.com
*
> +49-174-3413182
> > >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a,
85774
> Unterföhring
> > >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph
Stock, Dr. Robert
> Dahlke
> > >>>>>>>>> Sitz: Unterföhring * Amtsgericht München
* HRB 135082
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>> Konstantin Knauf * konstantin.knauf@tngtech.com *
> +49-174-3413182
> > >>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774
Unterföhring
> > >>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock,
Dr. Robert
> Dahlke
> > >>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>> --
> > >>>>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> > >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
Dahlke
> > >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >>>>
> > >>>>
> > >>>
> > >>
> > >> --
> > >> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> > >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> > >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> > >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> > >
> >
>
>

Mime
View raw message