flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [Discussion] Query regarding Join
Date Tue, 28 Jun 2016 09:11:41 GMT
Hi,
first regarding tumbling windows: even if you have 5 minute windows it can
happen that elements that are only seconds apart go into different windows.
Consider the following case:

|                x | x                 |

These are two 5-mintue windows and the two elements are only seconds apart
but go into different windows because windows are aligned to epoch.

Now, for the ascending timestamp extractor. The reason this can behave in
unexpected ways is that it emits a watermark that is "last timestamp - 1",
i.e. if it has seen timestamp t it can only emit watermark t-1 because
there might be other elements with timestamp t arriving. If you have a
continuous stream of elements you wouldn't notice this. Only in this
constructed example does it become visible.

Cheers,
Aljoscha

On Tue, 28 Jun 2016 at 06:04 Vinay Patil <vinay18.patil@gmail.com> wrote:

> Hi,
>
> Following is the timestamp I am getting from DTO, here is the timestamp
> difference between the two records :
> 1466115892162154279
> 1466116026233613409
>
> So the time difference is roughly 3 min, even if I apply the window of 5min
> , I am not getting the last record (last timestamp value above),
> using ascending timestamp extractor for generating the timestamp (assuming
> that the timestamp are always in order)
>
> I was at-least expecting data to reach the co-group function.
> What could be the reason for the data loss ? The data we are getting is
> critical, hence we cannot afford to loose any data
>
>
> Regards,
> Vinay Patil
>
> On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <vinay18.patil@gmail.com>
> wrote:
>
> > Just an update, when I keep IngestionTime and remove the timestamp I am
> > generating, I am getting all the records, but for Event Time I am getting
> > one less record, I checked the Time Difference between two records, it
> is 3
> > min, I tried keeping the window time to 5 mins, but that even did not
> work.
> >
> > Even when I try assigning timestamp for IngestionTime, I get one record
> > less, so should I safely use Ingestion Time or is it always advisable to
> > use EventTime ?
> >
> > Regards,
> > Vinay Patil
> >
> > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <vinay18.patil@gmail.com>
> > wrote:
> >
> >> Hi ,
> >>
> >> Actually I am only publishing 5 messages each to two different kafka
> >> topics (using Junit), even if I keep the window to 500 seconds the
> result
> >> is same.
> >>
> >> I am not understanding why it is not sending the 5th element to co-group
> >> operator even when the keys are same.
> >>
> >> I actually cannot share the actual client code.
> >> But this is what the streams look like :
> >> sourceStream.coGroup(destStream)
> >> here the sourceStream and destStream is actually Tuple2<String,DTO> ,
> and
> >> the ElementSelector returns tuple.f0 which is the key.
> >>
> >> I am generating the timestamp based on a field from the DTO which is
> >> guaranteed to be in order.
> >>
> >> Will using the triggers help here ?
> >>
> >>
> >> Regards,
> >> Vinay Patil
> >>
> >> *+91-800-728-4749*
> >>
> >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <aljoscha@apache.org>
> >> wrote:
> >>
> >>> Hi,
> >>> what timestamps are you assigning? Is it guaranteed that all of them
> >>> would
> >>> fall into the same 30 second window?
> >>>
> >>> The issue with duplicate printing in the ElementSelector is strange?
> >>> Could
> >>> you post a more complete code example so that I can reproduce the
> >>> problem?
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <vinay18.patil@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi ,
> >>> >
> >>> > I am able to get the matching and non-matching elements.
> >>> >
> >>> > However when I am unit testing the code , I am getting one record
> less
> >>> > inside the overriden cogroup function.
> >>> > Testing the following way :
> >>> >
> >>> > 1) Insert 5 messages into local kafka topic (test1)
> >>> > 2) Insert different 5 messages into local kafka topic (test2)
> >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> >>> > 4) Generate ascending timestamp(using Event Time) for both streams
> and
> >>> > create key(String)
> >>> >
> >>> > Now till 4) I am able to get all the records (checked by printing the
> >>> > stream in text file)
> >>> >
> >>> > However when I send the stream to co-group operator, I am receiving
> one
> >>> > less record, using the following syntax:
> >>> >
> >>> > sourceStream.coGroup(destStream)
> >>> > .where(new ElementSelector())
> >>> > .equalTo(new ElementSelector())
> >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >>> > .apply(new JoinStreams);
> >>> >
> >>> > Also in the Element Selector I have inserted a sysout, I am getting
> 20
> >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
> >>> >
> >>> > Unable to understand why one record is coming less to co-group
> >>> >
> >>> >
> >>> >
> >>> > Regards,
> >>> > Vinay Patil
> >>> >
> >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhueske@gmail.com>
> >>> wrote:
> >>> >
> >>> > > Can you add a flag to each element emitted by the CoGroupFunction
> >>> that
> >>> > > indicates whether it was joined or not?
> >>> > > Then you can use split to distinguish between both cases and handle
> >>> both
> >>> > > streams differently.
> >>> > >
> >>> > > Best, Fabian
> >>> > >
> >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.patil@gmail.com>:
> >>> > >
> >>> > > > Hi Jark,
> >>> > > >
> >>> > > > I am able to get the non-matching elements in a stream :,
> >>> > > >
> >>> > > > Of-course we can collect the matching elements in the same
stream
> >>> as
> >>> > > well,
> >>> > > > however I want to perform additional operations on the joined
> >>> stream
> >>> > > before
> >>> > > > writing it to S3, so I would have to include a separate join
> >>> operator
> >>> > for
> >>> > > > the same two streams, right ?
> >>> > > > Correct me if I am wrong.
> >>> > > >
> >>> > > > I have pasted the dummy code which collects the non-matching
> >>> records (i
> >>> > > > have to perform this on the actual data, correct me if I
am dong
> >>> > wrong).
> >>> > > >
> >>> > > > sourceStream.coGroup(destStream).where(new
> >>> > ElementSelector()).equalTo(new
> >>> > > > ElementSelector())
> >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>()
{
> >>> > > >
> >>> > > > private static final long serialVersionUID =
> 6408179761497497475L;
> >>> > > >
> >>> > > > @Override
> >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> >>> Iterable<Integer>
> >>> > > > paramIterable1,
> >>> > > > Collector<Integer> paramCollector) throws Exception
{
> >>> > > > long exactSizeIfKnown =
> >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> >>> > > > long exactSizeIfKnown2 =
> >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> >>> > > > if(exactSizeIfKnown == 0 ) {
> >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> >>> > > > } else if (exactSizeIfKnown2 == 0) {
> >>> > > > paramCollector.collect(paramIterable.iterator().next());
> >>> > > > }
> >>> > > > }
> >>> > > > }).print();
> >>> > > >
> >>> > > > Regards,
> >>> > > > Vinay Patil
> >>> > > >
> >>> > > >
> >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> >>> vinay18.patil@gmail.com>
> >>> > > > wrote:
> >>> > > >
> >>> > > > > You are right, debugged it for all elements , I can
do that
> now.
> >>> > > > > Thanks a lot.
> >>> > > > >
> >>> > > > > Regards,
> >>> > > > > Vinay Patil
> >>> > > > >
> >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> >>> > wuchong.wc@alibaba-inc.com>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer>
iter2,
> >>> > > > >> Collector<Integer> out)` ,   when both iter1
and iter2 are not
> >>> > empty,
> >>> > > it
> >>> > > > >> means they are matched elements from both stream.
> >>> > > > >> When one of iter1 and iter2 is empty , it means
that they are
> >>> > > unmatched.
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> - Jark Wu (wuchong)
> >>> > > > >>
> >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay
Patil <vinay18.patil@gmail.com>
> >>> 写道:
> >>> > > > >> >
> >>> > > > >> > Hi Matthias ,
> >>> > > > >> >
> >>> > > > >> > I did not get you, even if we use Co-Group
we have to apply
> >>> it on
> >>> > a
> >>> > > > key
> >>> > > > >> >
> >>> > > > >> > sourceStream.coGroup(destStream)
> >>> > > > >> > .where(new ElementSelector())
> >>> > > > >> > .equalTo(new ElementSelector())
> >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer,
Integer>() {
> >>> > > > >> > private static final long serialVersionUID
=
> >>> 6408179761497497475L;
> >>> > > > >> >
> >>> > > > >> > @Override
> >>> > > > >> > public void coGroup(Iterable<Integer>
paramIterable,
> >>> > > Iterable<Integer>
> >>> > > > >> > paramIterable1,
> >>> > > > >> > Collector<Integer> paramCollector) throws
Exception {
> >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> >>> > > > >> > while(iterator.hasNext()) {
> >>> > > > >> > }
> >>> > > > >> > }
> >>> > > > >> > });
> >>> > > > >> >
> >>> > > > >> > when I debug this ,only the matched element
from both stream
> >>> will
> >>> > > come
> >>> > > > >> in
> >>> > > > >> > the coGroup function.
> >>> > > > >> >
> >>> > > > >> > What I want is how do I check for unmatched
elements from
> both
> >>> > > streams
> >>> > > > >> and
> >>> > > > >> > write it to sink.
> >>> > > > >> >
> >>> > > > >> > Regards,
> >>> > > > >> > Vinay Patil
> >>> > > > >> >
> >>> > > > >> > *+91-800-728-4749*
> >>> > > > >> >
> >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J.
Sax <
> >>> > mjsax@apache.org>
> >>> > > > >> wrote:
> >>> > > > >> >
> >>> > > > >> >> You need to do an outer-join. However,
there is no build-in
> >>> > support
> >>> > > > for
> >>> > > > >> >> outer-joins yet.
> >>> > > > >> >>
> >>> > > > >> >> You can use Window-CoGroup to implement
the outer-join as
> an
> >>> own
> >>> > > > >> operator.
> >>> > > > >> >>
> >>> > > > >> >>
> >>> > > > >> >> -Matthias
> >>> > > > >> >>
> >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> >>> > > > >> >>> Hi,
> >>> > > > >> >>>
> >>> > > > >> >>> I have a question regarding the join
operation, consider
> the
> >>> > > > following
> >>> > > > >> >>> dummy example:
> >>> > > > >> >>>
> >>> > > > >> >>> StreamExecutionEnvironment env =
> >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>> > > > >> >>>
> >>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> >>> > > > >> >>> DataStreamSource<Integer> sourceStream
=
> >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> >>> > > > >> >>> DataStreamSource<Integer> destStream
=
> >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> >>> > > > >> >>>
> >>> > > > >> >>> sourceStream.join(destStream)
> >>> > > > >> >>> .where(new ElementSelector())
> >>> > > > >> >>> .equalTo(new ElementSelector())
> >>> > > > >> >>>
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> >>> > > > >> >>> .apply(new JoinFunction<Integer,
Integer, Integer>() {
> >>> > > > >> >>>
> >>> > > > >> >>> private static final long serialVersionUID
= 1L;
> >>> > > > >> >>>
> >>> > > > >> >>> @Override
> >>> > > > >> >>> public Integer join(Integer paramIN1,
Integer paramIN2)
> >>> throws
> >>> > > > >> Exception
> >>> > > > >> >> {
> >>> > > > >> >>> return paramIN1;
> >>> > > > >> >>> }
> >>> > > > >> >>> }).print();
> >>> > > > >> >>>
> >>> > > > >> >>> I perfectly get the elements that are
matching in both the
> >>> > > streams,
> >>> > > > >> >> however
> >>> > > > >> >>> my requirement is to write these matched
elements and also
> >>> the
> >>> > > > >> unmatched
> >>> > > > >> >>> elements to sink(S3)
> >>> > > > >> >>>
> >>> > > > >> >>> How do I get the unmatched elements
from each stream ?
> >>> > > > >> >>>
> >>> > > > >> >>> Regards,
> >>> > > > >> >>> Vinay Patil
> >>> > > > >> >>>
> >>> > > > >> >>
> >>> > > > >> >>
> >>> > > > >>
> >>> > > > >>
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message