flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vinay Patil <vinay18.pa...@gmail.com>
Subject Re: [Discussion] Query regarding Join
Date Tue, 28 Jun 2016 09:43:52 GMT
Hi Aljoscha,

Thank you for your response.
So do you suggest to use different approach for extracting timestamp (as
given in document) instead of AscendingTimeStamp Extractor ?
Is that the reason I am seeing this unexpected behaviour ? in case of
continuous stream I would not see any data loss ?

Also assuming that the records are always going to be in order , which is
the best approach : Ingestion Time or Event Time ?



Regards,
Vinay Patil

On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> first regarding tumbling windows: even if you have 5 minute windows it can
> happen that elements that are only seconds apart go into different windows.
> Consider the following case:
>
> |                x | x                 |
>
> These are two 5-mintue windows and the two elements are only seconds apart
> but go into different windows because windows are aligned to epoch.
>
> Now, for the ascending timestamp extractor. The reason this can behave in
> unexpected ways is that it emits a watermark that is "last timestamp - 1",
> i.e. if it has seen timestamp t it can only emit watermark t-1 because
> there might be other elements with timestamp t arriving. If you have a
> continuous stream of elements you wouldn't notice this. Only in this
> constructed example does it become visible.
>
> Cheers,
> Aljoscha
>
> On Tue, 28 Jun 2016 at 06:04 Vinay Patil <vinay18.patil@gmail.com> wrote:
>
> > Hi,
> >
> > Following is the timestamp I am getting from DTO, here is the timestamp
> > difference between the two records :
> > 1466115892162154279
> > 1466116026233613409
> >
> > So the time difference is roughly 3 min, even if I apply the window of
> 5min
> > , I am not getting the last record (last timestamp value above),
> > using ascending timestamp extractor for generating the timestamp
> (assuming
> > that the timestamp are always in order)
> >
> > I was at-least expecting data to reach the co-group function.
> > What could be the reason for the data loss ? The data we are getting is
> > critical, hence we cannot afford to loose any data
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <vinay18.patil@gmail.com>
> > wrote:
> >
> > > Just an update, when I keep IngestionTime and remove the timestamp I am
> > > generating, I am getting all the records, but for Event Time I am
> getting
> > > one less record, I checked the Time Difference between two records, it
> > is 3
> > > min, I tried keeping the window time to 5 mins, but that even did not
> > work.
> > >
> > > Even when I try assigning timestamp for IngestionTime, I get one record
> > > less, so should I safely use Ingestion Time or is it always advisable
> to
> > > use EventTime ?
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <vinay18.patil@gmail.com>
> > > wrote:
> > >
> > >> Hi ,
> > >>
> > >> Actually I am only publishing 5 messages each to two different kafka
> > >> topics (using Junit), even if I keep the window to 500 seconds the
> > result
> > >> is same.
> > >>
> > >> I am not understanding why it is not sending the 5th element to
> co-group
> > >> operator even when the keys are same.
> > >>
> > >> I actually cannot share the actual client code.
> > >> But this is what the streams look like :
> > >> sourceStream.coGroup(destStream)
> > >> here the sourceStream and destStream is actually Tuple2<String,DTO>
,
> > and
> > >> the ElementSelector returns tuple.f0 which is the key.
> > >>
> > >> I am generating the timestamp based on a field from the DTO which is
> > >> guaranteed to be in order.
> > >>
> > >> Will using the triggers help here ?
> > >>
> > >>
> > >> Regards,
> > >> Vinay Patil
> > >>
> > >> *+91-800-728-4749*
> > >>
> > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> aljoscha@apache.org>
> > >> wrote:
> > >>
> > >>> Hi,
> > >>> what timestamps are you assigning? Is it guaranteed that all of them
> > >>> would
> > >>> fall into the same 30 second window?
> > >>>
> > >>> The issue with duplicate printing in the ElementSelector is strange?
> > >>> Could
> > >>> you post a more complete code example so that I can reproduce the
> > >>> problem?
> > >>>
> > >>> Cheers,
> > >>> Aljoscha
> > >>>
> > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <vinay18.patil@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hi ,
> > >>> >
> > >>> > I am able to get the matching and non-matching elements.
> > >>> >
> > >>> > However when I am unit testing the code , I am getting one record
> > less
> > >>> > inside the overriden cogroup function.
> > >>> > Testing the following way :
> > >>> >
> > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > >>> > 2) Insert different 5 messages into local kafka topic (test2)
> > >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> > >>> > 4) Generate ascending timestamp(using Event Time) for both streams
> > and
> > >>> > create key(String)
> > >>> >
> > >>> > Now till 4) I am able to get all the records (checked by printing
> the
> > >>> > stream in text file)
> > >>> >
> > >>> > However when I send the stream to co-group operator, I am receiving
> > one
> > >>> > less record, using the following syntax:
> > >>> >
> > >>> > sourceStream.coGroup(destStream)
> > >>> > .where(new ElementSelector())
> > >>> > .equalTo(new ElementSelector())
> > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >>> > .apply(new JoinStreams);
> > >>> >
> > >>> > Also in the Element Selector I have inserted a sysout, I am getting
> > 20
> > >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest
> stream)
> > >>> >
> > >>> > Unable to understand why one record is coming less to co-group
> > >>> >
> > >>> >
> > >>> >
> > >>> > Regards,
> > >>> > Vinay Patil
> > >>> >
> > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <fhueske@gmail.com>
> > >>> wrote:
> > >>> >
> > >>> > > Can you add a flag to each element emitted by the CoGroupFunction
> > >>> that
> > >>> > > indicates whether it was joined or not?
> > >>> > > Then you can use split to distinguish between both cases
and
> handle
> > >>> both
> > >>> > > streams differently.
> > >>> > >
> > >>> > > Best, Fabian
> > >>> > >
> > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <vinay18.patil@gmail.com>:
> > >>> > >
> > >>> > > > Hi Jark,
> > >>> > > >
> > >>> > > > I am able to get the non-matching elements in a stream
:,
> > >>> > > >
> > >>> > > > Of-course we can collect the matching elements in the
same
> stream
> > >>> as
> > >>> > > well,
> > >>> > > > however I want to perform additional operations on the
joined
> > >>> stream
> > >>> > > before
> > >>> > > > writing it to S3, so I would have to include a separate
join
> > >>> operator
> > >>> > for
> > >>> > > > the same two streams, right ?
> > >>> > > > Correct me if I am wrong.
> > >>> > > >
> > >>> > > > I have pasted the dummy code which collects the non-matching
> > >>> records (i
> > >>> > > > have to perform this on the actual data, correct me
if I am
> dong
> > >>> > wrong).
> > >>> > > >
> > >>> > > > sourceStream.coGroup(destStream).where(new
> > >>> > ElementSelector()).equalTo(new
> > >>> > > > ElementSelector())
> > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>()
{
> > >>> > > >
> > >>> > > > private static final long serialVersionUID =
> > 6408179761497497475L;
> > >>> > > >
> > >>> > > > @Override
> > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > >>> Iterable<Integer>
> > >>> > > > paramIterable1,
> > >>> > > > Collector<Integer> paramCollector) throws Exception
{
> > >>> > > > long exactSizeIfKnown =
> > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > >>> > > > long exactSizeIfKnown2 =
> > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > >>> > > > if(exactSizeIfKnown == 0 ) {
> > >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > >>> > > > paramCollector.collect(paramIterable.iterator().next());
> > >>> > > > }
> > >>> > > > }
> > >>> > > > }).print();
> > >>> > > >
> > >>> > > > Regards,
> > >>> > > > Vinay Patil
> > >>> > > >
> > >>> > > >
> > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > >>> vinay18.patil@gmail.com>
> > >>> > > > wrote:
> > >>> > > >
> > >>> > > > > You are right, debugged it for all elements , I
can do that
> > now.
> > >>> > > > > Thanks a lot.
> > >>> > > > >
> > >>> > > > > Regards,
> > >>> > > > > Vinay Patil
> > >>> > > > >
> > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > >>> > wuchong.wc@alibaba-inc.com>
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
Iterable<Integer>
> iter2,
> > >>> > > > >> Collector<Integer> out)` ,   when both
iter1 and iter2 are
> not
> > >>> > empty,
> > >>> > > it
> > >>> > > > >> means they are matched elements from both stream.
> > >>> > > > >> When one of iter1 and iter2 is empty , it means
that they
> are
> > >>> > > unmatched.
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >> - Jark Wu (wuchong)
> > >>> > > > >>
> > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay
Patil <vinay18.patil@gmail.com
> >
> > >>> 写道:
> > >>> > > > >> >
> > >>> > > > >> > Hi Matthias ,
> > >>> > > > >> >
> > >>> > > > >> > I did not get you, even if we use Co-Group
we have to
> apply
> > >>> it on
> > >>> > a
> > >>> > > > key
> > >>> > > > >> >
> > >>> > > > >> > sourceStream.coGroup(destStream)
> > >>> > > > >> > .where(new ElementSelector())
> > >>> > > > >> > .equalTo(new ElementSelector())
> > >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >>> > > > >> > .apply(new CoGroupFunction<Integer,
Integer, Integer>() {
> > >>> > > > >> > private static final long serialVersionUID
=
> > >>> 6408179761497497475L;
> > >>> > > > >> >
> > >>> > > > >> > @Override
> > >>> > > > >> > public void coGroup(Iterable<Integer>
paramIterable,
> > >>> > > Iterable<Integer>
> > >>> > > > >> > paramIterable1,
> > >>> > > > >> > Collector<Integer> paramCollector)
throws Exception {
> > >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > >>> > > > >> > while(iterator.hasNext()) {
> > >>> > > > >> > }
> > >>> > > > >> > }
> > >>> > > > >> > });
> > >>> > > > >> >
> > >>> > > > >> > when I debug this ,only the matched element
from both
> stream
> > >>> will
> > >>> > > come
> > >>> > > > >> in
> > >>> > > > >> > the coGroup function.
> > >>> > > > >> >
> > >>> > > > >> > What I want is how do I check for unmatched
elements from
> > both
> > >>> > > streams
> > >>> > > > >> and
> > >>> > > > >> > write it to sink.
> > >>> > > > >> >
> > >>> > > > >> > Regards,
> > >>> > > > >> > Vinay Patil
> > >>> > > > >> >
> > >>> > > > >> > *+91-800-728-4749*
> > >>> > > > >> >
> > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias
J. Sax <
> > >>> > mjsax@apache.org>
> > >>> > > > >> wrote:
> > >>> > > > >> >
> > >>> > > > >> >> You need to do an outer-join. However,
there is no
> build-in
> > >>> > support
> > >>> > > > for
> > >>> > > > >> >> outer-joins yet.
> > >>> > > > >> >>
> > >>> > > > >> >> You can use Window-CoGroup to implement
the outer-join as
> > an
> > >>> own
> > >>> > > > >> operator.
> > >>> > > > >> >>
> > >>> > > > >> >>
> > >>> > > > >> >> -Matthias
> > >>> > > > >> >>
> > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil
wrote:
> > >>> > > > >> >>> Hi,
> > >>> > > > >> >>>
> > >>> > > > >> >>> I have a question regarding the
join operation, consider
> > the
> > >>> > > > following
> > >>> > > > >> >>> dummy example:
> > >>> > > > >> >>>
> > >>> > > > >> >>> StreamExecutionEnvironment env
=
> > >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>> > > > >> >>>
> > >>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > >>> > > > >> >>> DataStreamSource<Integer>
sourceStream =
> > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > >>> > > > >> >>> DataStreamSource<Integer>
destStream =
> > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > >>> > > > >> >>>
> > >>> > > > >> >>> sourceStream.join(destStream)
> > >>> > > > >> >>> .where(new ElementSelector())
> > >>> > > > >> >>> .equalTo(new ElementSelector())
> > >>> > > > >> >>>
> > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > >>> > > > >> >>> .apply(new JoinFunction<Integer,
Integer, Integer>() {
> > >>> > > > >> >>>
> > >>> > > > >> >>> private static final long serialVersionUID
= 1L;
> > >>> > > > >> >>>
> > >>> > > > >> >>> @Override
> > >>> > > > >> >>> public Integer join(Integer paramIN1,
Integer paramIN2)
> > >>> throws
> > >>> > > > >> Exception
> > >>> > > > >> >> {
> > >>> > > > >> >>> return paramIN1;
> > >>> > > > >> >>> }
> > >>> > > > >> >>> }).print();
> > >>> > > > >> >>>
> > >>> > > > >> >>> I perfectly get the elements that
are matching in both
> the
> > >>> > > streams,
> > >>> > > > >> >> however
> > >>> > > > >> >>> my requirement is to write these
matched elements and
> also
> > >>> the
> > >>> > > > >> unmatched
> > >>> > > > >> >>> elements to sink(S3)
> > >>> > > > >> >>>
> > >>> > > > >> >>> How do I get the unmatched elements
from each stream ?
> > >>> > > > >> >>>
> > >>> > > > >> >>> Regards,
> > >>> > > > >> >>> Vinay Patil
> > >>> > > > >> >>>
> > >>> > > > >> >>
> > >>> > > > >> >>
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message