flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vinay Patil <vinay18.pa...@gmail.com>
Subject Re: [Discussion] Query regarding Join and Windows
Date Wed, 29 Jun 2016 12:14:35 GMT
Hi,

Ok.
Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp) method
both these parameters are coming same (timestamp value) , I was expecting
last element timestamp value in the 1st param when I extract it.

Lets say I decide to use IngestionTime (since I am getting accurate results
here for now), if the joining element of both streams are assigned to
different windows , then it that case I will not get the match , right ?

However in case of event time this guarantees to be in the same window
since we are assigning the timestamp, correct me here.

 According to documentation :
* Ingestion Time programs cannot handle any out-of-order events or late
data*

In this context What do we mean by out-of-order events How does it know
that the events are out of order, I mean on which parameter does it decide
that the events are out-of-order  ? As in case of event time we can say the
timestamps received are out of order.

Late Data : does it have a threshold after which it does not accept late
data ?


Regards,
Vinay Patil

On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> the element will be kept around indefinitely if no new watermark arrives.
>
> I think the same problem will persist for AssignerWithPunctuatedWatermarks
> since there you also might not get the required "last watermark" to trigger
> processing of the last window.
>
> Cheers,
> Aljoscha
>
> On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vinay18.patil@gmail.com> wrote:
>
> > Hi Aljoscha,
> >
> > This clears a lot of doubts now.
> > So now lets say the stream paused for a while or it stops completely on
> > Friday , let us assume that the last message did not get processed and is
> > kept in the internal buffers.
> >
> > So when the stream starts again on Monday , will it consider the last
> > element that is in the internal buffer for processing ?
> >  How much time the internal buffer can hold the data or will it flush the
> > data after a threshold ?
> >
> > I have tried using AssignerWithPunctuatedWatermarks and generated the
> > watermark for each event, still getting one record less.
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <aljoscha@apache.org>
> > wrote:
> >
> > > Hi,
> > > the reason why the last element might never be emitted is the way the
> > > ascending timestamp extractor works. I'll try and explain with an
> > example.
> > >
> > > Let's say we have a window size of 2 milliseconds, elements arrive
> > starting
> > > with timestamp 0, window begin timestamp is inclusive, end timestamp is
> > > exclusive:
> > >
> > > Element 0, Timestamp 0 (at this point the watermark is -1)
> > > Element 1, Timestamp 1 (at this point the watermark is 0)
> > > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > > Element 3, Timestamp 2 (at this point the watermark is 1)
> > >
> > > now we can process the window (0, 2) because we know from the watermark
> > > that no elements can arrive for that window anymore. The window
> contains
> > > elements 0,1,2
> > >
> > > Element 4, Timestamp 3 (at this point the watermark is 2)
> > > Element 5, Timestamp 4 (at this point the watermark is 3)
> > >
> > > now we can process window (2, 4). The window contains elements 3,4.
> > >
> > > At this point, we have Element 5 sitting in internal buffers for window
> > (4,
> > > 6) but if we don't receive further elements the watermark will never
> > > advance and we will never process that window.
> > >
> > > If, however, we get new elements at some point the watermark advances
> and
> > > we don't have a problem. That's what I meant when I said that you
> > shouldn't
> > > have a problem if data keeps continuously arriving.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > >
> > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <vinay18.patil@gmail.com>
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thanks a lot for your inputs.
> > > >
> > > > I still did not get you when you say you will not face this issue in
> > case
> > > > of continuous stream, lets consider the following example :
> > > > Assume that the stream runs continuously from Monday  to Friday, and
> on
> > > > Friday it stops after 5.00 PM , will I still face this issue ?
> > > >
> > > > I am actually not able to understand how it will differ in real time
> > > > streams.
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > ingestion time can only be used if you don't care about the
> timestamp
> > > in
> > > > > the elements. So if you have those you should probably use event
> > time.
> > > > >
> > > > > If your timestamps really are strictly increasing then the
> ascending
> > > > > extractor is good. And if you have a continuous stream of incoming
> > > > elements
> > > > > you will not see the behavior of not getting the last elements.
> > > > >
> > > > > By the way, when using Kafka you can also embed the timestamp
> > extractor
> > > > > directly in the Kafka consumer. This is described here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <vinay18.patil@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > Thank you for your response.
> > > > > > So do you suggest to use different approach for extracting
> > timestamp
> > > > (as
> > > > > > given in document) instead of AscendingTimeStamp Extractor ?
> > > > > > Is that the reason I am seeing this unexpected behaviour ? in
> case
> > of
> > > > > > continuous stream I would not see any data loss ?
> > > > > >
> > > > > > Also assuming that the records are always going to be in order
,
> > > which
> > > > is
> > > > > > the best approach : Ingestion Time or Event Time ?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > > aljoscha@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > first regarding tumbling windows: even if you have 5 minute
> > windows
> > > > it
> > > > > > can
> > > > > > > happen that elements that are only seconds apart go into
> > different
> > > > > > windows.
> > > > > > > Consider the following case:
> > > > > > >
> > > > > > > |                x | x                 |
> > > > > > >
> > > > > > > These are two 5-mintue windows and the two elements are
only
> > > seconds
> > > > > > apart
> > > > > > > but go into different windows because windows are aligned
to
> > epoch.
> > > > > > >
> > > > > > > Now, for the ascending timestamp extractor. The reason
this can
> > > > behave
> > > > > in
> > > > > > > unexpected ways is that it emits a watermark that is "last
> > > timestamp
> > > > -
> > > > > > 1",
> > > > > > > i.e. if it has seen timestamp t it can only emit watermark
t-1
> > > > because
> > > > > > > there might be other elements with timestamp t arriving.
If you
> > > have
> > > > a
> > > > > > > continuous stream of elements you wouldn't notice this.
Only in
> > > this
> > > > > > > constructed example does it become visible.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> > vinay18.patil@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Following is the timestamp I am getting from DTO,
here is the
> > > > > timestamp
> > > > > > > > difference between the two records :
> > > > > > > > 1466115892162154279
> > > > > > > > 1466116026233613409
> > > > > > > >
> > > > > > > > So the time difference is roughly 3 min, even if I
apply the
> > > window
> > > > > of
> > > > > > > 5min
> > > > > > > > , I am not getting the last record (last timestamp
value
> > above),
> > > > > > > > using ascending timestamp extractor for generating
the
> > timestamp
> > > > > > > (assuming
> > > > > > > > that the timestamp are always in order)
> > > > > > > >
> > > > > > > > I was at-least expecting data to reach the co-group
function.
> > > > > > > > What could be the reason for the data loss ? The data
we are
> > > > getting
> > > > > is
> > > > > > > > critical, hence we cannot afford to loose any data
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Vinay Patil
> > > > > > > >
> > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > > vinay18.patil@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Just an update, when I keep IngestionTime and
remove the
> > > > timestamp
> > > > > I
> > > > > > am
> > > > > > > > > generating, I am getting all the records, but
for Event
> Time
> > I
> > > am
> > > > > > > getting
> > > > > > > > > one less record, I checked the Time Difference
between two
> > > > records,
> > > > > > it
> > > > > > > > is 3
> > > > > > > > > min, I tried keeping the window time to 5 mins,
but that
> even
> > > did
> > > > > not
> > > > > > > > work.
> > > > > > > > >
> > > > > > > > > Even when I try assigning timestamp for IngestionTime,
I
> get
> > > one
> > > > > > record
> > > > > > > > > less, so should I safely use Ingestion Time or
is it always
> > > > > advisable
> > > > > > > to
> > > > > > > > > use EventTime ?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Vinay Patil
> > > > > > > > >
> > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil
<
> > > > > > vinay18.patil@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi ,
> > > > > > > > >>
> > > > > > > > >> Actually I am only publishing 5 messages
each to two
> > different
> > > > > kafka
> > > > > > > > >> topics (using Junit), even if I keep the
window to 500
> > seconds
> > > > the
> > > > > > > > result
> > > > > > > > >> is same.
> > > > > > > > >>
> > > > > > > > >> I am not understanding why it is not sending
the 5th
> element
> > > to
> > > > > > > co-group
> > > > > > > > >> operator even when the keys are same.
> > > > > > > > >>
> > > > > > > > >> I actually cannot share the actual client
code.
> > > > > > > > >> But this is what the streams look like :
> > > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > > >> here the sourceStream and destStream is actually
> > > > > Tuple2<String,DTO>
> > > > > > ,
> > > > > > > > and
> > > > > > > > >> the ElementSelector returns tuple.f0 which
is the key.
> > > > > > > > >>
> > > > > > > > >> I am generating the timestamp based on a
field from the
> DTO
> > > > which
> > > > > is
> > > > > > > > >> guaranteed to be in order.
> > > > > > > > >>
> > > > > > > > >> Will using the triggers help here ?
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> Regards,
> > > > > > > > >> Vinay Patil
> > > > > > > > >>
> > > > > > > > >> *+91-800-728-4749*
> > > > > > > > >>
> > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha
Krettek <
> > > > > > > aljoscha@apache.org>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >>> Hi,
> > > > > > > > >>> what timestamps are you assigning? Is
it guaranteed that
> > all
> > > of
> > > > > > them
> > > > > > > > >>> would
> > > > > > > > >>> fall into the same 30 second window?
> > > > > > > > >>>
> > > > > > > > >>> The issue with duplicate printing in
the ElementSelector
> is
> > > > > > strange?
> > > > > > > > >>> Could
> > > > > > > > >>> you post a more complete code example
so that I can
> > reproduce
> > > > the
> > > > > > > > >>> problem?
> > > > > > > > >>>
> > > > > > > > >>> Cheers,
> > > > > > > > >>> Aljoscha
> > > > > > > > >>>
> > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil
<
> > > > > vinay18.patil@gmail.com>
> > > > > > > > >>> wrote:
> > > > > > > > >>>
> > > > > > > > >>> > Hi ,
> > > > > > > > >>> >
> > > > > > > > >>> > I am able to get the matching and
non-matching
> elements.
> > > > > > > > >>> >
> > > > > > > > >>> > However when I am unit testing the
code , I am getting
> > one
> > > > > record
> > > > > > > > less
> > > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > > >>> > Testing the following way :
> > > > > > > > >>> >
> > > > > > > > >>> > 1) Insert 5 messages into local
kafka topic (test1)
> > > > > > > > >>> > 2) Insert different 5 messages into
local kafka topic
> > > (test2)
> > > > > > > > >>> > 3) Consume 1) and 2) and I have
two different kafka
> > > streams
> > > > > > > > >>> > 4) Generate ascending timestamp(using
Event Time) for
> > both
> > > > > > streams
> > > > > > > > and
> > > > > > > > >>> > create key(String)
> > > > > > > > >>> >
> > > > > > > > >>> > Now till 4) I am able to get all
the records (checked
> by
> > > > > printing
> > > > > > > the
> > > > > > > > >>> > stream in text file)
> > > > > > > > >>> >
> > > > > > > > >>> > However when I send the stream to
co-group operator, I
> am
> > > > > > receiving
> > > > > > > > one
> > > > > > > > >>> > less record, using the following
syntax:
> > > > > > > > >>> >
> > > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > > >>> > .where(new ElementSelector())
> > > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > > >>> >
> > > > > > > > >>> > Also in the Element Selector I have
inserted a sysout,
> I
> > am
> > > > > > getting
> > > > > > > > 20
> > > > > > > > >>> > sysouts instead of 10 (10 sysouts
for source and 10 for
> > > dest
> > > > > > > stream)
> > > > > > > > >>> >
> > > > > > > > >>> > Unable to understand why one record
is coming less to
> > > > co-group
> > > > > > > > >>> >
> > > > > > > > >>> >
> > > > > > > > >>> >
> > > > > > > > >>> > Regards,
> > > > > > > > >>> > Vinay Patil
> > > > > > > > >>> >
> > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM,
Fabian Hueske <
> > > > > > fhueske@gmail.com>
> > > > > > > > >>> wrote:
> > > > > > > > >>> >
> > > > > > > > >>> > > Can you add a flag to each
element emitted by the
> > > > > > CoGroupFunction
> > > > > > > > >>> that
> > > > > > > > >>> > > indicates whether it was joined
or not?
> > > > > > > > >>> > > Then you can use split to distinguish
between both
> > cases
> > > > and
> > > > > > > handle
> > > > > > > > >>> both
> > > > > > > > >>> > > streams differently.
> > > > > > > > >>> > >
> > > > > > > > >>> > > Best, Fabian
> > > > > > > > >>> > >
> > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay
Patil <
> > > > > vinay18.patil@gmail.com
> > > > > > >:
> > > > > > > > >>> > >
> > > > > > > > >>> > > > Hi Jark,
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > I am able to get the non-matching
elements in a
> > stream
> > > :,
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Of-course we can collect
the matching elements in
> the
> > > > same
> > > > > > > stream
> > > > > > > > >>> as
> > > > > > > > >>> > > well,
> > > > > > > > >>> > > > however I want to perform
additional operations on
> > the
> > > > > joined
> > > > > > > > >>> stream
> > > > > > > > >>> > > before
> > > > > > > > >>> > > > writing it to S3, so I
would have to include a
> > separate
> > > > > join
> > > > > > > > >>> operator
> > > > > > > > >>> > for
> > > > > > > > >>> > > > the same two streams,
right ?
> > > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > I have pasted the dummy
code which collects the
> > > > > non-matching
> > > > > > > > >>> records (i
> > > > > > > > >>> > > > have to perform this on
the actual data, correct me
> > if
> > > I
> > > > am
> > > > > > > dong
> > > > > > > > >>> > wrong).
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > > >>> > > > ElementSelector())
> > > > > > > > >>> > > >
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer,
Integer,
> > > Integer>() {
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > private static final long
serialVersionUID =
> > > > > > > > 6408179761497497475L;
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > @Override
> > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> paramIterable,
> > > > > > > > >>> Iterable<Integer>
> > > > > > > > >>> > > > paramIterable1,
> > > > > > > > >>> > > > Collector<Integer>
paramCollector) throws
> Exception {
> > > > > > > > >>> > > > long exactSizeIfKnown
=
> > > > > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > > >>> > > > long exactSizeIfKnown2
=
> > > > > > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > > >>> > > > if(exactSizeIfKnown ==
0 ) {
> > > > > > > > >>> > > >
> > > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > > >>> > > > } else if (exactSizeIfKnown2
== 0) {
> > > > > > > > >>> > > >
> > > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > > >>> > > > }
> > > > > > > > >>> > > > }
> > > > > > > > >>> > > > }).print();
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Regards,
> > > > > > > > >>> > > > Vinay Patil
> > > > > > > > >>> > > >
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > On Tue, Jun 14, 2016 at
1:37 PM, Vinay Patil <
> > > > > > > > >>> vinay18.patil@gmail.com>
> > > > > > > > >>> > > > wrote:
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > > You are right, debugged
it for all elements , I
> can
> > > do
> > > > > that
> > > > > > > > now.
> > > > > > > > >>> > > > > Thanks a lot.
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > > Regards,
> > > > > > > > >>> > > > > Vinay Patil
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > > On Tue, Jun 14, 2016
at 11:56 AM, Jark Wu <
> > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> > > > > > > > >>> > > > > wrote:
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer>
iter1,
> > > Iterable<Integer>
> > > > > > > iter2,
> > > > > > > > >>> > > > >> Collector<Integer>
out)` ,   when both iter1 and
> > > iter2
> > > > > are
> > > > > > > not
> > > > > > > > >>> > empty,
> > > > > > > > >>> > > it
> > > > > > > > >>> > > > >> means they are
matched elements from both
> stream.
> > > > > > > > >>> > > > >> When one of iter1
and iter2 is empty , it means
> > that
> > > > > they
> > > > > > > are
> > > > > > > > >>> > > unmatched.
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay
Patil <
> > > > > > vinay18.patil@gmail.com
> > > > > > > >
> > > > > > > > >>> 写道:
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > Hi Matthias
,
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > I did not
get you, even if we use Co-Group we
> > have
> > > > to
> > > > > > > apply
> > > > > > > > >>> it on
> > > > > > > > >>> > a
> > > > > > > > >>> > > > key
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > > >>> > > > >> > .where(new
ElementSelector())
> > > > > > > > >>> > > > >> > .equalTo(new
ElementSelector())
> > > > > > > > >>> > > > >> >
> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > >>> > > > >> > .apply(new
CoGroupFunction<Integer, Integer,
> > > > > Integer>()
> > > > > > {
> > > > > > > > >>> > > > >> > private
static final long serialVersionUID =
> > > > > > > > >>> 6408179761497497475L;
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > @Override
> > > > > > > > >>> > > > >> > public void
coGroup(Iterable<Integer>
> > > paramIterable,
> > > > > > > > >>> > > Iterable<Integer>
> > > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > > >>> > > > >> > Collector<Integer>
paramCollector) throws
> > > Exception
> > > > {
> > > > > > > > >>> > > > >> > Iterator<Integer>
iterator =
> > > > paramIterable.iterator();
> > > > > > > > >>> > > > >> > while(iterator.hasNext())
{
> > > > > > > > >>> > > > >> > }
> > > > > > > > >>> > > > >> > }
> > > > > > > > >>> > > > >> > });
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > when I debug
this ,only the matched element
> from
> > > > both
> > > > > > > stream
> > > > > > > > >>> will
> > > > > > > > >>> > > come
> > > > > > > > >>> > > > >> in
> > > > > > > > >>> > > > >> > the coGroup
function.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > What I want
is how do I check for unmatched
> > > elements
> > > > > > from
> > > > > > > > both
> > > > > > > > >>> > > streams
> > > > > > > > >>> > > > >> and
> > > > > > > > >>> > > > >> > write it
to sink.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > Regards,
> > > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > On Tue,
Jun 14, 2016 at 2:07 AM, Matthias J.
> > Sax <
> > > > > > > > >>> > mjsax@apache.org>
> > > > > > > > >>> > > > >> wrote:
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> >> You
need to do an outer-join. However, there
> is
> > > no
> > > > > > > build-in
> > > > > > > > >>> > support
> > > > > > > > >>> > > > for
> > > > > > > > >>> > > > >> >> outer-joins
yet.
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >> You
can use Window-CoGroup to implement the
> > > > > outer-join
> > > > > > as
> > > > > > > > an
> > > > > > > > >>> own
> > > > > > > > >>> > > > >> operator.
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >> -Matthias
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >> On 06/13/2016
06:53 PM, Vinay Patil wrote:
> > > > > > > > >>> > > > >> >>>
Hi,
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
I have a question regarding the join
> > operation,
> > > > > > consider
> > > > > > > > the
> > > > > > > > >>> > > > following
> > > > > > > > >>> > > > >> >>>
dummy example:
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
StreamExecutionEnvironment env =
> > > > > > > > >>> > > > >> >>>
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> >
> > > > > >
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > > >>> > > > >> >>>
DataStreamSource<Integer> sourceStream =
> > > > > > > > >>> > > > >> >>>
env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > > >>> > > > >> >>>
DataStreamSource<Integer> destStream =
> > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
sourceStream.join(destStream)
> > > > > > > > >>> > > > >> >>>
.where(new ElementSelector())
> > > > > > > > >>> > > > >> >>>
.equalTo(new ElementSelector())
> > > > > > > > >>> > > > >> >>>
> > > > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > > >>> > > > >> >>>
.apply(new JoinFunction<Integer, Integer,
> > > > > Integer>() {
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
private static final long serialVersionUID =
> > 1L;
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
@Override
> > > > > > > > >>> > > > >> >>>
public Integer join(Integer paramIN1,
> Integer
> > > > > > paramIN2)
> > > > > > > > >>> throws
> > > > > > > > >>> > > > >> Exception
> > > > > > > > >>> > > > >> >> {
> > > > > > > > >>> > > > >> >>>
return paramIN1;
> > > > > > > > >>> > > > >> >>>
}
> > > > > > > > >>> > > > >> >>>
}).print();
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
I perfectly get the elements that are
> matching
> > > in
> > > > > both
> > > > > > > the
> > > > > > > > >>> > > streams,
> > > > > > > > >>> > > > >> >> however
> > > > > > > > >>> > > > >> >>>
my requirement is to write these matched
> > > elements
> > > > > and
> > > > > > > also
> > > > > > > > >>> the
> > > > > > > > >>> > > > >> unmatched
> > > > > > > > >>> > > > >> >>>
elements to sink(S3)
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
How do I get the unmatched elements from
> each
> > > > > stream ?
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>>
Regards,
> > > > > > > > >>> > > > >> >>>
Vinay Patil
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > >
> > > > > > > > >>> > >
> > > > > > > > >>> >
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message