flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [Discussion] Query regarding Join
Date Tue, 28 Jun 2016 11:37:36 GMT
Hi,
ingestion time can only be used if you don't care about the timestamp in
the elements. So if you have those you should probably use event time.

If your timestamps really are strictly increasing then the ascending
extractor is good. And if you have a continuous stream of incoming elements
you will not see the behavior of not getting the last elements.

By the way, when using Kafka you can also embed the timestamp extractor
directly in the Kafka consumer. This is described here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

Cheers,
Aljoscha

On Tue, 28 Jun 2016 at 11:44 Vinay Patil <vinay18.patil@gmail.com> wrote:

> Hi Aljoscha,
>
> Thank you for your response.
> So do you suggest to use different approach for extracting timestamp (as
> given in document) instead of AscendingTimeStamp Extractor ?
> Is that the reason I am seeing this unexpected behaviour ? in case of
> continuous stream I would not see any data loss ?
>
> Also assuming that the records are always going to be in order , which is
> the best approach : Ingestion Time or Event Time ?
>
>
>
> Regards,
> Vinay Patil
>
> On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
> > Hi,
> > first regarding tumbling windows: even if you have 5 minute windows it
> can
> > happen that elements that are only seconds apart go into different
> windows.
> > Consider the following case:
> >
> > |                x | x                 |
> >
> > These are two 5-mintue windows and the two elements are only seconds
> apart
> > but go into different windows because windows are aligned to epoch.
> >
> > Now, for the ascending timestamp extractor. The reason this can behave in
> > unexpected ways is that it emits a watermark that is "last timestamp -
> 1",
> > i.e. if it has seen timestamp t it can only emit watermark t-1 because
> > there might be other elements with timestamp t arriving. If you have a
> > continuous stream of elements you wouldn't notice this. Only in this
> > constructed example does it become visible.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <vinay18.patil@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > Following is the timestamp I am getting from DTO, here is the timestamp
> > > difference between the two records :
> > > 1466115892162154279
> > > 1466116026233613409
> > >
> > > So the time difference is roughly 3 min, even if I apply the window of
> > 5min
> > > , I am not getting the last record (last timestamp value above),
> > > using ascending timestamp extractor for generating the timestamp
> > (assuming
> > > that the timestamp are always in order)
> > >
> > > I was at-least expecting data to reach the co-group function.
> > > What could be the reason for the data loss ? The data we are getting is
> > > critical, hence we cannot afford to loose any data
> > >
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <vinay18.patil@gmail.com
> >
> > > wrote:
> > >
> > > > Just an update, when I keep IngestionTime and remove the timestamp I
> am
> > > > generating, I am getting all the records, but for Event Time I am
> > getting
> > > > one less record, I checked the Time Difference between two records,
> it
> > > is 3
> > > > min, I tried keeping the window time to 5 mins, but that even did not
> > > work.
> > > >
> > > > Even when I try assigning timestamp for IngestionTime, I get one
> record
> > > > less, so should I safely use Ingestion Time or is it always advisable
> > to
> > > > use EventTime ?
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> vinay18.patil@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi ,
> > > >>
> > > >> Actually I am only publishing 5 messages each to two different kafka
> > > >> topics (using Junit), even if I keep the window to 500 seconds the
> > > result
> > > >> is same.
> > > >>
> > > >> I am not understanding why it is not sending the 5th element to
> > co-group
> > > >> operator even when the keys are same.
> > > >>
> > > >> I actually cannot share the actual client code.
> > > >> But this is what the streams look like :
> > > >> sourceStream.coGroup(destStream)
> > > >> here the sourceStream and destStream is actually Tuple2<String,DTO>
> ,
> > > and
> > > >> the ElementSelector returns tuple.f0 which is the key.
> > > >>
> > > >> I am generating the timestamp based on a field from the DTO which
is
> > > >> guaranteed to be in order.
> > > >>
> > > >> Will using the triggers help here ?
> > > >>
> > > >>
> > > >> Regards,
> > > >> Vinay Patil
> > > >>
> > > >> *+91-800-728-4749*
> > > >>
> > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > aljoscha@apache.org>
> > > >> wrote:
> > > >>
> > > >>> Hi,
> > > >>> what timestamps are you assigning? Is it guaranteed that all of
> them
> > > >>> would
> > > >>> fall into the same 30 second window?
> > > >>>
> > > >>> The issue with duplicate printing in the ElementSelector is
> strange?
> > > >>> Could
> > > >>> you post a more complete code example so that I can reproduce
the
> > > >>> problem?
> > > >>>
> > > >>> Cheers,
> > > >>> Aljoscha
> > > >>>
> > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <vinay18.patil@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>> > Hi ,
> > > >>> >
> > > >>> > I am able to get the matching and non-matching elements.
> > > >>> >
> > > >>> > However when I am unit testing the code , I am getting one
record
> > > less
> > > >>> > inside the overriden cogroup function.
> > > >>> > Testing the following way :
> > > >>> >
> > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > >>> > 2) Insert different 5 messages into local kafka topic (test2)
> > > >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> > > >>> > 4) Generate ascending timestamp(using Event Time) for both
> streams
> > > and
> > > >>> > create key(String)
> > > >>> >
> > > >>> > Now till 4) I am able to get all the records (checked by
printing
> > the
> > > >>> > stream in text file)
> > > >>> >
> > > >>> > However when I send the stream to co-group operator, I am
> receiving
> > > one
> > > >>> > less record, using the following syntax:
> > > >>> >
> > > >>> > sourceStream.coGroup(destStream)
> > > >>> > .where(new ElementSelector())
> > > >>> > .equalTo(new ElementSelector())
> > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >>> > .apply(new JoinStreams);
> > > >>> >
> > > >>> > Also in the Element Selector I have inserted a sysout, I
am
> getting
> > > 20
> > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest
> > stream)
> > > >>> >
> > > >>> > Unable to understand why one record is coming less to co-group
> > > >>> >
> > > >>> >
> > > >>> >
> > > >>> > Regards,
> > > >>> > Vinay Patil
> > > >>> >
> > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> fhueske@gmail.com>
> > > >>> wrote:
> > > >>> >
> > > >>> > > Can you add a flag to each element emitted by the
> CoGroupFunction
> > > >>> that
> > > >>> > > indicates whether it was joined or not?
> > > >>> > > Then you can use split to distinguish between both cases
and
> > handle
> > > >>> both
> > > >>> > > streams differently.
> > > >>> > >
> > > >>> > > Best, Fabian
> > > >>> > >
> > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.patil@gmail.com
> >:
> > > >>> > >
> > > >>> > > > Hi Jark,
> > > >>> > > >
> > > >>> > > > I am able to get the non-matching elements in a
stream :,
> > > >>> > > >
> > > >>> > > > Of-course we can collect the matching elements
in the same
> > stream
> > > >>> as
> > > >>> > > well,
> > > >>> > > > however I want to perform additional operations
on the joined
> > > >>> stream
> > > >>> > > before
> > > >>> > > > writing it to S3, so I would have to include a
separate join
> > > >>> operator
> > > >>> > for
> > > >>> > > > the same two streams, right ?
> > > >>> > > > Correct me if I am wrong.
> > > >>> > > >
> > > >>> > > > I have pasted the dummy code which collects the
non-matching
> > > >>> records (i
> > > >>> > > > have to perform this on the actual data, correct
me if I am
> > dong
> > > >>> > wrong).
> > > >>> > > >
> > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > >>> > ElementSelector()).equalTo(new
> > > >>> > > > ElementSelector())
> > > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >>> > > > .apply(new CoGroupFunction<Integer, Integer,
Integer>() {
> > > >>> > > >
> > > >>> > > > private static final long serialVersionUID =
> > > 6408179761497497475L;
> > > >>> > > >
> > > >>> > > > @Override
> > > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > > >>> Iterable<Integer>
> > > >>> > > > paramIterable1,
> > > >>> > > > Collector<Integer> paramCollector) throws
Exception {
> > > >>> > > > long exactSizeIfKnown =
> > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > >>> > > > long exactSizeIfKnown2 =
> > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > >>> > > > paramCollector.collect(paramIterable.iterator().next());
> > > >>> > > > }
> > > >>> > > > }
> > > >>> > > > }).print();
> > > >>> > > >
> > > >>> > > > Regards,
> > > >>> > > > Vinay Patil
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > >>> vinay18.patil@gmail.com>
> > > >>> > > > wrote:
> > > >>> > > >
> > > >>> > > > > You are right, debugged it for all elements
, I can do that
> > > now.
> > > >>> > > > > Thanks a lot.
> > > >>> > > > >
> > > >>> > > > > Regards,
> > > >>> > > > > Vinay Patil
> > > >>> > > > >
> > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu
<
> > > >>> > wuchong.wc@alibaba-inc.com>
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
Iterable<Integer>
> > iter2,
> > > >>> > > > >> Collector<Integer> out)` ,   when
both iter1 and iter2 are
> > not
> > > >>> > empty,
> > > >>> > > it
> > > >>> > > > >> means they are matched elements from both
stream.
> > > >>> > > > >> When one of iter1 and iter2 is empty ,
it means that they
> > are
> > > >>> > > unmatched.
> > > >>> > > > >>
> > > >>> > > > >>
> > > >>> > > > >> - Jark Wu (wuchong)
> > > >>> > > > >>
> > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay
Patil <
> vinay18.patil@gmail.com
> > >
> > > >>> 写道:
> > > >>> > > > >> >
> > > >>> > > > >> > Hi Matthias ,
> > > >>> > > > >> >
> > > >>> > > > >> > I did not get you, even if we use
Co-Group we have to
> > apply
> > > >>> it on
> > > >>> > a
> > > >>> > > > key
> > > >>> > > > >> >
> > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > >>> > > > >> > .where(new ElementSelector())
> > > >>> > > > >> > .equalTo(new ElementSelector())
> > > >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
Integer, Integer>()
> {
> > > >>> > > > >> > private static final long serialVersionUID
=
> > > >>> 6408179761497497475L;
> > > >>> > > > >> >
> > > >>> > > > >> > @Override
> > > >>> > > > >> > public void coGroup(Iterable<Integer>
paramIterable,
> > > >>> > > Iterable<Integer>
> > > >>> > > > >> > paramIterable1,
> > > >>> > > > >> > Collector<Integer> paramCollector)
throws Exception {
> > > >>> > > > >> > Iterator<Integer> iterator
= paramIterable.iterator();
> > > >>> > > > >> > while(iterator.hasNext()) {
> > > >>> > > > >> > }
> > > >>> > > > >> > }
> > > >>> > > > >> > });
> > > >>> > > > >> >
> > > >>> > > > >> > when I debug this ,only the matched
element from both
> > stream
> > > >>> will
> > > >>> > > come
> > > >>> > > > >> in
> > > >>> > > > >> > the coGroup function.
> > > >>> > > > >> >
> > > >>> > > > >> > What I want is how do I check for
unmatched elements
> from
> > > both
> > > >>> > > streams
> > > >>> > > > >> and
> > > >>> > > > >> > write it to sink.
> > > >>> > > > >> >
> > > >>> > > > >> > Regards,
> > > >>> > > > >> > Vinay Patil
> > > >>> > > > >> >
> > > >>> > > > >> > *+91-800-728-4749*
> > > >>> > > > >> >
> > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM,
Matthias J. Sax <
> > > >>> > mjsax@apache.org>
> > > >>> > > > >> wrote:
> > > >>> > > > >> >
> > > >>> > > > >> >> You need to do an outer-join.
However, there is no
> > build-in
> > > >>> > support
> > > >>> > > > for
> > > >>> > > > >> >> outer-joins yet.
> > > >>> > > > >> >>
> > > >>> > > > >> >> You can use Window-CoGroup to
implement the outer-join
> as
> > > an
> > > >>> own
> > > >>> > > > >> operator.
> > > >>> > > > >> >>
> > > >>> > > > >> >>
> > > >>> > > > >> >> -Matthias
> > > >>> > > > >> >>
> > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay
Patil wrote:
> > > >>> > > > >> >>> Hi,
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> I have a question regarding
the join operation,
> consider
> > > the
> > > >>> > > > following
> > > >>> > > > >> >>> dummy example:
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> StreamExecutionEnvironment
env =
> > > >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > > >>> > > > >> >>>
> > > >>> >
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > >>> > > > >> >>> DataStreamSource<Integer>
sourceStream =
> > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > >>> > > > >> >>> DataStreamSource<Integer>
destStream =
> > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> sourceStream.join(destStream)
> > > >>> > > > >> >>> .where(new ElementSelector())
> > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > >>> > > > >> >>>
> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > >>> > > > >> >>> .apply(new JoinFunction<Integer,
Integer, Integer>() {
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> private static final long
serialVersionUID = 1L;
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> @Override
> > > >>> > > > >> >>> public Integer join(Integer
paramIN1, Integer
> paramIN2)
> > > >>> throws
> > > >>> > > > >> Exception
> > > >>> > > > >> >> {
> > > >>> > > > >> >>> return paramIN1;
> > > >>> > > > >> >>> }
> > > >>> > > > >> >>> }).print();
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> I perfectly get the elements
that are matching in both
> > the
> > > >>> > > streams,
> > > >>> > > > >> >> however
> > > >>> > > > >> >>> my requirement is to write
these matched elements and
> > also
> > > >>> the
> > > >>> > > > >> unmatched
> > > >>> > > > >> >>> elements to sink(S3)
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> How do I get the unmatched
elements from each stream ?
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> Regards,
> > > >>> > > > >> >>> Vinay Patil
> > > >>> > > > >> >>>
> > > >>> > > > >> >>
> > > >>> > > > >> >>
> > > >>> > > > >>
> > > >>> > > > >>
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message