flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [Discussion] Query regarding Join
Date Wed, 29 Jun 2016 11:45:37 GMT
Hi,
the element will be kept around indefinitely if no new watermark arrives.

I think the same problem will persist for AssignerWithPunctuatedWatermarks
since there you also might not get the required "last watermark" to trigger
processing of the last window.

Cheers,
Aljoscha

On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vinay18.patil@gmail.com> wrote:

> Hi Aljoscha,
>
> This clears a lot of doubts now.
> So now lets say the stream paused for a while or it stops completely on
> Friday , let us assume that the last message did not get processed and is
> kept in the internal buffers.
>
> So when the stream starts again on Monday , will it consider the last
> element that is in the internal buffer for processing ?
>  How much time the internal buffer can hold the data or will it flush the
> data after a threshold ?
>
> I have tried using AssignerWithPunctuatedWatermarks and generated the
> watermark for each event, still getting one record less.
>
>
> Regards,
> Vinay Patil
>
> On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
> > Hi,
> > the reason why the last element might never be emitted is the way the
> > ascending timestamp extractor works. I'll try and explain with an
> example.
> >
> > Let's say we have a window size of 2 milliseconds, elements arrive
> starting
> > with timestamp 0, window begin timestamp is inclusive, end timestamp is
> > exclusive:
> >
> > Element 0, Timestamp 0 (at this point the watermark is -1)
> > Element 1, Timestamp 1 (at this point the watermark is 0)
> > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > Element 3, Timestamp 2 (at this point the watermark is 1)
> >
> > now we can process the window (0, 2) because we know from the watermark
> > that no elements can arrive for that window anymore. The window contains
> > elements 0,1,2
> >
> > Element 4, Timestamp 3 (at this point the watermark is 2)
> > Element 5, Timestamp 4 (at this point the watermark is 3)
> >
> > now we can process window (2, 4). The window contains elements 3,4.
> >
> > At this point, we have Element 5 sitting in internal buffers for window
> (4,
> > 6) but if we don't receive further elements the watermark will never
> > advance and we will never process that window.
> >
> > If, however, we get new elements at some point the watermark advances and
> > we don't have a problem. That's what I meant when I said that you
> shouldn't
> > have a problem if data keeps continuously arriving.
> >
> > Cheers,
> > Aljoscha
> >
> >
> > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <vinay18.patil@gmail.com>
> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thanks a lot for your inputs.
> > >
> > > I still did not get you when you say you will not face this issue in
> case
> > > of continuous stream, lets consider the following example :
> > > Assume that the stream runs continuously from Monday  to Friday, and on
> > > Friday it stops after 5.00 PM , will I still face this issue ?
> > >
> > > I am actually not able to understand how it will differ in real time
> > > streams.
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > Hi,
> > > > ingestion time can only be used if you don't care about the timestamp
> > in
> > > > the elements. So if you have those you should probably use event
> time.
> > > >
> > > > If your timestamps really are strictly increasing then the ascending
> > > > extractor is good. And if you have a continuous stream of incoming
> > > elements
> > > > you will not see the behavior of not getting the last elements.
> > > >
> > > > By the way, when using Kafka you can also embed the timestamp
> extractor
> > > > directly in the Kafka consumer. This is described here:
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <vinay18.patil@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > Thank you for your response.
> > > > > So do you suggest to use different approach for extracting
> timestamp
> > > (as
> > > > > given in document) instead of AscendingTimeStamp Extractor ?
> > > > > Is that the reason I am seeing this unexpected behaviour ? in case
> of
> > > > > continuous stream I would not see any data loss ?
> > > > >
> > > > > Also assuming that the records are always going to be in order ,
> > which
> > > is
> > > > > the best approach : Ingestion Time or Event Time ?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > first regarding tumbling windows: even if you have 5 minute
> windows
> > > it
> > > > > can
> > > > > > happen that elements that are only seconds apart go into
> different
> > > > > windows.
> > > > > > Consider the following case:
> > > > > >
> > > > > > |                x | x                 |
> > > > > >
> > > > > > These are two 5-mintue windows and the two elements are only
> > seconds
> > > > > apart
> > > > > > but go into different windows because windows are aligned to
> epoch.
> > > > > >
> > > > > > Now, for the ascending timestamp extractor. The reason this
can
> > > behave
> > > > in
> > > > > > unexpected ways is that it emits a watermark that is "last
> > timestamp
> > > -
> > > > > 1",
> > > > > > i.e. if it has seen timestamp t it can only emit watermark t-1
> > > because
> > > > > > there might be other elements with timestamp t arriving. If
you
> > have
> > > a
> > > > > > continuous stream of elements you wouldn't notice this. Only
in
> > this
> > > > > > constructed example does it become visible.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> vinay18.patil@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Following is the timestamp I am getting from DTO, here
is the
> > > > timestamp
> > > > > > > difference between the two records :
> > > > > > > 1466115892162154279
> > > > > > > 1466116026233613409
> > > > > > >
> > > > > > > So the time difference is roughly 3 min, even if I apply
the
> > window
> > > > of
> > > > > > 5min
> > > > > > > , I am not getting the last record (last timestamp value
> above),
> > > > > > > using ascending timestamp extractor for generating the
> timestamp
> > > > > > (assuming
> > > > > > > that the timestamp are always in order)
> > > > > > >
> > > > > > > I was at-least expecting data to reach the co-group function.
> > > > > > > What could be the reason for the data loss ? The data we
are
> > > getting
> > > > is
> > > > > > > critical, hence we cannot afford to loose any data
> > > > > > >
> > > > > > >
> > > > > > > Regards,
> > > > > > > Vinay Patil
> > > > > > >
> > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > vinay18.patil@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Just an update, when I keep IngestionTime and remove
the
> > > timestamp
> > > > I
> > > > > am
> > > > > > > > generating, I am getting all the records, but for
Event Time
> I
> > am
> > > > > > getting
> > > > > > > > one less record, I checked the Time Difference between
two
> > > records,
> > > > > it
> > > > > > > is 3
> > > > > > > > min, I tried keeping the window time to 5 mins, but
that even
> > did
> > > > not
> > > > > > > work.
> > > > > > > >
> > > > > > > > Even when I try assigning timestamp for IngestionTime,
I get
> > one
> > > > > record
> > > > > > > > less, so should I safely use Ingestion Time or is
it always
> > > > advisable
> > > > > > to
> > > > > > > > use EventTime ?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Vinay Patil
> > > > > > > >
> > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > > vinay18.patil@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi ,
> > > > > > > >>
> > > > > > > >> Actually I am only publishing 5 messages each
to two
> different
> > > > kafka
> > > > > > > >> topics (using Junit), even if I keep the window
to 500
> seconds
> > > the
> > > > > > > result
> > > > > > > >> is same.
> > > > > > > >>
> > > > > > > >> I am not understanding why it is not sending the
5th element
> > to
> > > > > > co-group
> > > > > > > >> operator even when the keys are same.
> > > > > > > >>
> > > > > > > >> I actually cannot share the actual client code.
> > > > > > > >> But this is what the streams look like :
> > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > >> here the sourceStream and destStream is actually
> > > > Tuple2<String,DTO>
> > > > > ,
> > > > > > > and
> > > > > > > >> the ElementSelector returns tuple.f0 which is
the key.
> > > > > > > >>
> > > > > > > >> I am generating the timestamp based on a field
from the DTO
> > > which
> > > > is
> > > > > > > >> guaranteed to be in order.
> > > > > > > >>
> > > > > > > >> Will using the triggers help here ?
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Regards,
> > > > > > > >> Vinay Patil
> > > > > > > >>
> > > > > > > >> *+91-800-728-4749*
> > > > > > > >>
> > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek
<
> > > > > > aljoscha@apache.org>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> Hi,
> > > > > > > >>> what timestamps are you assigning? Is it guaranteed
that
> all
> > of
> > > > > them
> > > > > > > >>> would
> > > > > > > >>> fall into the same 30 second window?
> > > > > > > >>>
> > > > > > > >>> The issue with duplicate printing in the ElementSelector
is
> > > > > strange?
> > > > > > > >>> Could
> > > > > > > >>> you post a more complete code example so that
I can
> reproduce
> > > the
> > > > > > > >>> problem?
> > > > > > > >>>
> > > > > > > >>> Cheers,
> > > > > > > >>> Aljoscha
> > > > > > > >>>
> > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > > vinay18.patil@gmail.com>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>> > Hi ,
> > > > > > > >>> >
> > > > > > > >>> > I am able to get the matching and non-matching
elements.
> > > > > > > >>> >
> > > > > > > >>> > However when I am unit testing the code
, I am getting
> one
> > > > record
> > > > > > > less
> > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > >>> > Testing the following way :
> > > > > > > >>> >
> > > > > > > >>> > 1) Insert 5 messages into local kafka
topic (test1)
> > > > > > > >>> > 2) Insert different 5 messages into local
kafka topic
> > (test2)
> > > > > > > >>> > 3) Consume 1) and 2) and I have two different
kafka
> > streams
> > > > > > > >>> > 4) Generate ascending timestamp(using
Event Time) for
> both
> > > > > streams
> > > > > > > and
> > > > > > > >>> > create key(String)
> > > > > > > >>> >
> > > > > > > >>> > Now till 4) I am able to get all the
records (checked by
> > > > printing
> > > > > > the
> > > > > > > >>> > stream in text file)
> > > > > > > >>> >
> > > > > > > >>> > However when I send the stream to co-group
operator, I am
> > > > > receiving
> > > > > > > one
> > > > > > > >>> > less record, using the following syntax:
> > > > > > > >>> >
> > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > >>> > .where(new ElementSelector())
> > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > >>> >
> > > > > > > >>> > Also in the Element Selector I have inserted
a sysout, I
> am
> > > > > getting
> > > > > > > 20
> > > > > > > >>> > sysouts instead of 10 (10 sysouts for
source and 10 for
> > dest
> > > > > > stream)
> > > > > > > >>> >
> > > > > > > >>> > Unable to understand why one record is
coming less to
> > > co-group
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> > Regards,
> > > > > > > >>> > Vinay Patil
> > > > > > > >>> >
> > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian
Hueske <
> > > > > fhueske@gmail.com>
> > > > > > > >>> wrote:
> > > > > > > >>> >
> > > > > > > >>> > > Can you add a flag to each element
emitted by the
> > > > > CoGroupFunction
> > > > > > > >>> that
> > > > > > > >>> > > indicates whether it was joined
or not?
> > > > > > > >>> > > Then you can use split to distinguish
between both
> cases
> > > and
> > > > > > handle
> > > > > > > >>> both
> > > > > > > >>> > > streams differently.
> > > > > > > >>> > >
> > > > > > > >>> > > Best, Fabian
> > > > > > > >>> > >
> > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay
Patil <
> > > > vinay18.patil@gmail.com
> > > > > >:
> > > > > > > >>> > >
> > > > > > > >>> > > > Hi Jark,
> > > > > > > >>> > > >
> > > > > > > >>> > > > I am able to get the non-matching
elements in a
> stream
> > :,
> > > > > > > >>> > > >
> > > > > > > >>> > > > Of-course we can collect the
matching elements in the
> > > same
> > > > > > stream
> > > > > > > >>> as
> > > > > > > >>> > > well,
> > > > > > > >>> > > > however I want to perform additional
operations on
> the
> > > > joined
> > > > > > > >>> stream
> > > > > > > >>> > > before
> > > > > > > >>> > > > writing it to S3, so I would
have to include a
> separate
> > > > join
> > > > > > > >>> operator
> > > > > > > >>> > for
> > > > > > > >>> > > > the same two streams, right
?
> > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > >>> > > >
> > > > > > > >>> > > > I have pasted the dummy code
which collects the
> > > > non-matching
> > > > > > > >>> records (i
> > > > > > > >>> > > > have to perform this on the
actual data, correct me
> if
> > I
> > > am
> > > > > > dong
> > > > > > > >>> > wrong).
> > > > > > > >>> > > >
> > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > >>> > > > ElementSelector())
> > > > > > > >>> > > >
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > >>> > > > .apply(new CoGroupFunction<Integer,
Integer,
> > Integer>() {
> > > > > > > >>> > > >
> > > > > > > >>> > > > private static final long serialVersionUID
=
> > > > > > > 6408179761497497475L;
> > > > > > > >>> > > >
> > > > > > > >>> > > > @Override
> > > > > > > >>> > > > public void coGroup(Iterable<Integer>
paramIterable,
> > > > > > > >>> Iterable<Integer>
> > > > > > > >>> > > > paramIterable1,
> > > > > > > >>> > > > Collector<Integer> paramCollector)
throws Exception {
> > > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > >>> > > > if(exactSizeIfKnown == 0 )
{
> > > > > > > >>> > > >
> > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > >>> > > > } else if (exactSizeIfKnown2
== 0) {
> > > > > > > >>> > > >
> > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > >>> > > > }
> > > > > > > >>> > > > }
> > > > > > > >>> > > > }).print();
> > > > > > > >>> > > >
> > > > > > > >>> > > > Regards,
> > > > > > > >>> > > > Vinay Patil
> > > > > > > >>> > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37
PM, Vinay Patil <
> > > > > > > >>> vinay18.patil@gmail.com>
> > > > > > > >>> > > > wrote:
> > > > > > > >>> > > >
> > > > > > > >>> > > > > You are right, debugged
it for all elements , I can
> > do
> > > > that
> > > > > > > now.
> > > > > > > >>> > > > > Thanks a lot.
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Regards,
> > > > > > > >>> > > > > Vinay Patil
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > On Tue, Jun 14, 2016 at
11:56 AM, Jark Wu <
> > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> > > > > > > >>> > > > > wrote:
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >> In `coGroup(Iterable<Integer>
iter1,
> > Iterable<Integer>
> > > > > > iter2,
> > > > > > > >>> > > > >> Collector<Integer>
out)` ,   when both iter1 and
> > iter2
> > > > are
> > > > > > not
> > > > > > > >>> > empty,
> > > > > > > >>> > > it
> > > > > > > >>> > > > >> means they are matched
elements from both stream.
> > > > > > > >>> > > > >> When one of iter1
and iter2 is empty , it means
> that
> > > > they
> > > > > > are
> > > > > > > >>> > > unmatched.
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay
Patil <
> > > > > vinay18.patil@gmail.com
> > > > > > >
> > > > > > > >>> 写道:
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > I did not get
you, even if we use Co-Group we
> have
> > > to
> > > > > > apply
> > > > > > > >>> it on
> > > > > > > >>> > a
> > > > > > > >>> > > > key
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > > >>> > > > >> > .equalTo(new
ElementSelector())
> > > > > > > >>> > > > >> >
> > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
Integer,
> > > > Integer>()
> > > > > {
> > > > > > > >>> > > > >> > private static
final long serialVersionUID =
> > > > > > > >>> 6408179761497497475L;
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > @Override
> > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> > paramIterable,
> > > > > > > >>> > > Iterable<Integer>
> > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > >>> > > > >> > Collector<Integer>
paramCollector) throws
> > Exception
> > > {
> > > > > > > >>> > > > >> > Iterator<Integer>
iterator =
> > > paramIterable.iterator();
> > > > > > > >>> > > > >> > while(iterator.hasNext())
{
> > > > > > > >>> > > > >> > }
> > > > > > > >>> > > > >> > }
> > > > > > > >>> > > > >> > });
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > when I debug
this ,only the matched element from
> > > both
> > > > > > stream
> > > > > > > >>> will
> > > > > > > >>> > > come
> > > > > > > >>> > > > >> in
> > > > > > > >>> > > > >> > the coGroup function.
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > What I want is
how do I check for unmatched
> > elements
> > > > > from
> > > > > > > both
> > > > > > > >>> > > streams
> > > > > > > >>> > > > >> and
> > > > > > > >>> > > > >> > write it to sink.
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > Regards,
> > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > On Tue, Jun 14,
2016 at 2:07 AM, Matthias J.
> Sax <
> > > > > > > >>> > mjsax@apache.org>
> > > > > > > >>> > > > >> wrote:
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> >> You need
to do an outer-join. However, there is
> > no
> > > > > > build-in
> > > > > > > >>> > support
> > > > > > > >>> > > > for
> > > > > > > >>> > > > >> >> outer-joins
yet.
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >> You can use
Window-CoGroup to implement the
> > > > outer-join
> > > > > as
> > > > > > > an
> > > > > > > >>> own
> > > > > > > >>> > > > >> operator.
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >> -Matthias
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >> On 06/13/2016
06:53 PM, Vinay Patil wrote:
> > > > > > > >>> > > > >> >>> Hi,
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> I have
a question regarding the join
> operation,
> > > > > consider
> > > > > > > the
> > > > > > > >>> > > > following
> > > > > > > >>> > > > >> >>> dummy
example:
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> StreamExecutionEnvironment
env =
> > > > > > > >>> > > > >> >>>
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> >
> > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
sourceStream =
> > > > > > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
destStream =
> > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > > >>> > > > >> >>> .where(new
ElementSelector())
> > > > > > > >>> > > > >> >>> .equalTo(new
ElementSelector())
> > > > > > > >>> > > > >> >>>
> > > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > >>> > > > >> >>> .apply(new
JoinFunction<Integer, Integer,
> > > > Integer>() {
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> private
static final long serialVersionUID =
> 1L;
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> @Override
> > > > > > > >>> > > > >> >>> public
Integer join(Integer paramIN1, Integer
> > > > > paramIN2)
> > > > > > > >>> throws
> > > > > > > >>> > > > >> Exception
> > > > > > > >>> > > > >> >> {
> > > > > > > >>> > > > >> >>> return
paramIN1;
> > > > > > > >>> > > > >> >>> }
> > > > > > > >>> > > > >> >>> }).print();
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> I perfectly
get the elements that are matching
> > in
> > > > both
> > > > > > the
> > > > > > > >>> > > streams,
> > > > > > > >>> > > > >> >> however
> > > > > > > >>> > > > >> >>> my requirement
is to write these matched
> > elements
> > > > and
> > > > > > also
> > > > > > > >>> the
> > > > > > > >>> > > > >> unmatched
> > > > > > > >>> > > > >> >>> elements
to sink(S3)
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> How do
I get the unmatched elements from each
> > > > stream ?
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> Regards,
> > > > > > > >>> > > > >> >>> Vinay
Patil
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message