Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BA296200B33 for ; Wed, 29 Jun 2016 13:18:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B8C34160A57; Wed, 29 Jun 2016 11:18:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5B4FD160A4D for ; Wed, 29 Jun 2016 13:18:58 +0200 (CEST) Received: (qmail 27624 invoked by uid 500); 29 Jun 2016 11:18:57 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 27609 invoked by uid 99); 29 Jun 2016 11:18:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jun 2016 11:18:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A59981A0AAA for ; Wed, 29 Jun 2016 11:18:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id HmPS1oIkoMO0 for ; Wed, 29 Jun 2016 11:18:48 +0000 (UTC) Received: from mail-it0-f52.google.com (mail-it0-f52.google.com [209.85.214.52]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id 7169F60E72 for ; Wed, 29 Jun 2016 11:18:48 +0000 (UTC) Received: by mail-it0-f52.google.com with SMTP id g127so114596261ith.0 for ; Wed, 29 Jun 2016 04:18:48 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=KeqUjo5fWUvcw1Mn0ryJoR00NGMxgbSqEVMKgul4FY8=; b=Qx702ee3oH4va41nSRKj5aXrxDQFZqXHksL0QNjBagn8xf8LtNOg43AJ7nK5FZdqv5 FRpHkEVdSf5TG5mvLeZmdzpCk2zMLPPRwigx2Ik3kMEFE4MyFE2HjpEdaUVDTV8x94KK 66VNrMFccu3tsnpxxlvCK2R8qwMOICupkzT5NjYG9kWumRRbO296N36rC9qG/V68D2IL wqguHHoSMxCxjOPXYDXj06jo2uKQHVOalKon21svpwaad4molsecYg8OIv+Ndl3laHpR ABzrxDZ8h3Xyl55iC4dkmcV6QCqVb7LW6w4FeOOzXTm4ydHNWXj5J05OxwfVZavSOuII gvbQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=KeqUjo5fWUvcw1Mn0ryJoR00NGMxgbSqEVMKgul4FY8=; b=DJaw1rP9tLkjK2VoM4TXaFbok78ka61lb4jXvnBq0Ph53K3UHayvAXvwaOMtBGfibm yWw6UqfmtJWcrnBEPYdpDJJrbJpaUVDqaW+SN+D7c6s0Ilh0yAivgNBApsLDSyIkPFle 63oMLow8778/q5h3sS34IxS4cG6YmGxsww+WHEXbfKAP8udDx0BBTUVScArJVj/3rbHu 3GkKSuJYGvqsX+i6qr+lKYeg7U8CKSZx9PYcByl8M7ySExru7czj1XAbpgqqzkPzwiGt 57czVy+EbSmJsaEhZQu7jc0EsucoAYr/IdcJ+xC7Rq/4AnaEAnH7ErohUpMQgl0gfayQ UaQA== X-Gm-Message-State: ALyK8tKX+/7iZaI25GHjWIWJTcJmGsMWMvSMYzDcdckSRHUkWyye4ypUSD1toqs5GVY0QwH044BT2QrsOuuDdA== X-Received: by 10.36.48.80 with SMTP id q77mr8741495itq.83.1467199127161; Wed, 29 Jun 2016 04:18:47 -0700 (PDT) MIME-Version: 1.0 Received: by 10.36.24.75 with HTTP; Wed, 29 Jun 2016 04:18:46 -0700 (PDT) In-Reply-To: References: <575F1989.8070003@apache.org> <26F1352E-3493-4AEA-BFF6-F7A99924E73C@alibaba-inc.com> From: Vinay Patil Date: Wed, 29 Jun 2016 16:48:46 +0530 Message-ID: Subject: Re: [Discussion] Query regarding Join To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c007d8ca655b053668ea25 archived-at: Wed, 29 Jun 2016 11:18:59 -0000 --001a11c007d8ca655b053668ea25 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Aljoscha, This clears a lot of doubts now. So now lets say the stream paused for a while or it stops completely on Friday , let us assume that the last message did not get processed and is kept in the internal buffers. So when the stream starts again on Monday , will it consider the last element that is in the internal buffer for processing ? How much time the internal buffer can hold the data or will it flush the data after a threshold ? I have tried using AssignerWithPunctuatedWatermarks and generated the watermark for each event, still getting one record less. Regards, Vinay Patil On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek wrote: > Hi, > the reason why the last element might never be emitted is the way the > ascending timestamp extractor works. I'll try and explain with an example= . > > Let's say we have a window size of 2 milliseconds, elements arrive starti= ng > with timestamp 0, window begin timestamp is inclusive, end timestamp is > exclusive: > > Element 0, Timestamp 0 (at this point the watermark is -1) > Element 1, Timestamp 1 (at this point the watermark is 0) > Element 2, Timestamp 1 (at this point the watermark is still 0) > Element 3, Timestamp 2 (at this point the watermark is 1) > > now we can process the window (0, 2) because we know from the watermark > that no elements can arrive for that window anymore. The window contains > elements 0,1,2 > > Element 4, Timestamp 3 (at this point the watermark is 2) > Element 5, Timestamp 4 (at this point the watermark is 3) > > now we can process window (2, 4). The window contains elements 3,4. > > At this point, we have Element 5 sitting in internal buffers for window (= 4, > 6) but if we don't receive further elements the watermark will never > advance and we will never process that window. > > If, however, we get new elements at some point the watermark advances and > we don't have a problem. That's what I meant when I said that you shouldn= 't > have a problem if data keeps continuously arriving. > > Cheers, > Aljoscha > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil wrote: > > > Hi Aljoscha, > > > > Thanks a lot for your inputs. > > > > I still did not get you when you say you will not face this issue in ca= se > > of continuous stream, lets consider the following example : > > Assume that the stream runs continuously from Monday to Friday, and on > > Friday it stops after 5.00 PM , will I still face this issue ? > > > > I am actually not able to understand how it will differ in real time > > streams. > > > > Regards, > > Vinay Patil > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek > > wrote: > > > > > Hi, > > > ingestion time can only be used if you don't care about the timestamp > in > > > the elements. So if you have those you should probably use event time= . > > > > > > If your timestamps really are strictly increasing then the ascending > > > extractor is good. And if you have a continuous stream of incoming > > elements > > > you will not see the behavior of not getting the last elements. > > > > > > By the way, when using Kafka you can also embed the timestamp extract= or > > > directly in the Kafka consumer. This is described here: > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/con= nectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emissi= on > > > > > > Cheers, > > > Aljoscha > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil > > wrote: > > > > > > > Hi Aljoscha, > > > > > > > > Thank you for your response. > > > > So do you suggest to use different approach for extracting timestam= p > > (as > > > > given in document) instead of AscendingTimeStamp Extractor ? > > > > Is that the reason I am seeing this unexpected behaviour ? in case = of > > > > continuous stream I would not see any data loss ? > > > > > > > > Also assuming that the records are always going to be in order , > which > > is > > > > the best approach : Ingestion Time or Event Time ? > > > > > > > > > > > > > > > > Regards, > > > > Vinay Patil > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek < > aljoscha@apache.org > > > > > > > wrote: > > > > > > > > > Hi, > > > > > first regarding tumbling windows: even if you have 5 minute windo= ws > > it > > > > can > > > > > happen that elements that are only seconds apart go into differen= t > > > > windows. > > > > > Consider the following case: > > > > > > > > > > | x | x | > > > > > > > > > > These are two 5-mintue windows and the two elements are only > seconds > > > > apart > > > > > but go into different windows because windows are aligned to epoc= h. > > > > > > > > > > Now, for the ascending timestamp extractor. The reason this can > > behave > > > in > > > > > unexpected ways is that it emits a watermark that is "last > timestamp > > - > > > > 1", > > > > > i.e. if it has seen timestamp t it can only emit watermark t-1 > > because > > > > > there might be other elements with timestamp t arriving. If you > have > > a > > > > > continuous stream of elements you wouldn't notice this. Only in > this > > > > > constructed example does it become visible. > > > > > > > > > > Cheers, > > > > > Aljoscha > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Following is the timestamp I am getting from DTO, here is the > > > timestamp > > > > > > difference between the two records : > > > > > > 1466115892162154279 > > > > > > 1466116026233613409 > > > > > > > > > > > > So the time difference is roughly 3 min, even if I apply the > window > > > of > > > > > 5min > > > > > > , I am not getting the last record (last timestamp value above)= , > > > > > > using ascending timestamp extractor for generating the timestam= p > > > > > (assuming > > > > > > that the timestamp are always in order) > > > > > > > > > > > > I was at-least expecting data to reach the co-group function. > > > > > > What could be the reason for the data loss ? The data we are > > getting > > > is > > > > > > critical, hence we cannot afford to loose any data > > > > > > > > > > > > > > > > > > Regards, > > > > > > Vinay Patil > > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil < > > > vinay18.patil@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > > > Just an update, when I keep IngestionTime and remove the > > timestamp > > > I > > > > am > > > > > > > generating, I am getting all the records, but for Event Time = I > am > > > > > getting > > > > > > > one less record, I checked the Time Difference between two > > records, > > > > it > > > > > > is 3 > > > > > > > min, I tried keeping the window time to 5 mins, but that even > did > > > not > > > > > > work. > > > > > > > > > > > > > > Even when I try assigning timestamp for IngestionTime, I get > one > > > > record > > > > > > > less, so should I safely use Ingestion Time or is it always > > > advisable > > > > > to > > > > > > > use EventTime ? > > > > > > > > > > > > > > Regards, > > > > > > > Vinay Patil > > > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil < > > > > vinay18.patil@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > >> Hi , > > > > > > >> > > > > > > >> Actually I am only publishing 5 messages each to two differe= nt > > > kafka > > > > > > >> topics (using Junit), even if I keep the window to 500 secon= ds > > the > > > > > > result > > > > > > >> is same. > > > > > > >> > > > > > > >> I am not understanding why it is not sending the 5th element > to > > > > > co-group > > > > > > >> operator even when the keys are same. > > > > > > >> > > > > > > >> I actually cannot share the actual client code. > > > > > > >> But this is what the streams look like : > > > > > > >> sourceStream.coGroup(destStream) > > > > > > >> here the sourceStream and destStream is actually > > > Tuple2 > > > > , > > > > > > and > > > > > > >> the ElementSelector returns tuple.f0 which is the key. > > > > > > >> > > > > > > >> I am generating the timestamp based on a field from the DTO > > which > > > is > > > > > > >> guaranteed to be in order. > > > > > > >> > > > > > > >> Will using the triggers help here ? > > > > > > >> > > > > > > >> > > > > > > >> Regards, > > > > > > >> Vinay Patil > > > > > > >> > > > > > > >> *+91-800-728-4749* > > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek < > > > > > aljoscha@apache.org> > > > > > > >> wrote: > > > > > > >> > > > > > > >>> Hi, > > > > > > >>> what timestamps are you assigning? Is it guaranteed that al= l > of > > > > them > > > > > > >>> would > > > > > > >>> fall into the same 30 second window? > > > > > > >>> > > > > > > >>> The issue with duplicate printing in the ElementSelector is > > > > strange? > > > > > > >>> Could > > > > > > >>> you post a more complete code example so that I can reprodu= ce > > the > > > > > > >>> problem? > > > > > > >>> > > > > > > >>> Cheers, > > > > > > >>> Aljoscha > > > > > > >>> > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil < > > > vinay18.patil@gmail.com> > > > > > > >>> wrote: > > > > > > >>> > > > > > > >>> > Hi , > > > > > > >>> > > > > > > > >>> > I am able to get the matching and non-matching elements. > > > > > > >>> > > > > > > > >>> > However when I am unit testing the code , I am getting on= e > > > record > > > > > > less > > > > > > >>> > inside the overriden cogroup function. > > > > > > >>> > Testing the following way : > > > > > > >>> > > > > > > > >>> > 1) Insert 5 messages into local kafka topic (test1) > > > > > > >>> > 2) Insert different 5 messages into local kafka topic > (test2) > > > > > > >>> > 3) Consume 1) and 2) and I have two different kafka > streams > > > > > > >>> > 4) Generate ascending timestamp(using Event Time) for bot= h > > > > streams > > > > > > and > > > > > > >>> > create key(String) > > > > > > >>> > > > > > > > >>> > Now till 4) I am able to get all the records (checked by > > > printing > > > > > the > > > > > > >>> > stream in text file) > > > > > > >>> > > > > > > > >>> > However when I send the stream to co-group operator, I am > > > > receiving > > > > > > one > > > > > > >>> > less record, using the following syntax: > > > > > > >>> > > > > > > > >>> > sourceStream.coGroup(destStream) > > > > > > >>> > .where(new ElementSelector()) > > > > > > >>> > .equalTo(new ElementSelector()) > > > > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > > > > > >>> > .apply(new JoinStreams); > > > > > > >>> > > > > > > > >>> > Also in the Element Selector I have inserted a sysout, I = am > > > > getting > > > > > > 20 > > > > > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for > dest > > > > > stream) > > > > > > >>> > > > > > > > >>> > Unable to understand why one record is coming less to > > co-group > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > Regards, > > > > > > >>> > Vinay Patil > > > > > > >>> > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske < > > > > fhueske@gmail.com> > > > > > > >>> wrote: > > > > > > >>> > > > > > > > >>> > > Can you add a flag to each element emitted by the > > > > CoGroupFunction > > > > > > >>> that > > > > > > >>> > > indicates whether it was joined or not? > > > > > > >>> > > Then you can use split to distinguish between both case= s > > and > > > > > handle > > > > > > >>> both > > > > > > >>> > > streams differently. > > > > > > >>> > > > > > > > > >>> > > Best, Fabian > > > > > > >>> > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil < > > > vinay18.patil@gmail.com > > > > >: > > > > > > >>> > > > > > > > > >>> > > > Hi Jark, > > > > > > >>> > > > > > > > > > >>> > > > I am able to get the non-matching elements in a strea= m > :, > > > > > > >>> > > > > > > > > > >>> > > > Of-course we can collect the matching elements in the > > same > > > > > stream > > > > > > >>> as > > > > > > >>> > > well, > > > > > > >>> > > > however I want to perform additional operations on th= e > > > joined > > > > > > >>> stream > > > > > > >>> > > before > > > > > > >>> > > > writing it to S3, so I would have to include a separa= te > > > join > > > > > > >>> operator > > > > > > >>> > for > > > > > > >>> > > > the same two streams, right ? > > > > > > >>> > > > Correct me if I am wrong. > > > > > > >>> > > > > > > > > > >>> > > > I have pasted the dummy code which collects the > > > non-matching > > > > > > >>> records (i > > > > > > >>> > > > have to perform this on the actual data, correct me i= f > I > > am > > > > > dong > > > > > > >>> > wrong). > > > > > > >>> > > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new > > > > > > >>> > ElementSelector()).equalTo(new > > > > > > >>> > > > ElementSelector()) > > > > > > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))= ) > > > > > > >>> > > > .apply(new CoGroupFunction Integer>() { > > > > > > >>> > > > > > > > > > >>> > > > private static final long serialVersionUID =3D > > > > > > 6408179761497497475L; > > > > > > >>> > > > > > > > > > >>> > > > @Override > > > > > > >>> > > > public void coGroup(Iterable paramIterable, > > > > > > >>> Iterable > > > > > > >>> > > > paramIterable1, > > > > > > >>> > > > Collector paramCollector) throws Exception { > > > > > > >>> > > > long exactSizeIfKnown =3D > > > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown(); > > > > > > >>> > > > long exactSizeIfKnown2 =3D > > > > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown(); > > > > > > >>> > > > if(exactSizeIfKnown =3D=3D 0 ) { > > > > > > >>> > > > > paramCollector.collect(paramIterable1.iterator().next()); > > > > > > >>> > > > } else if (exactSizeIfKnown2 =3D=3D 0) { > > > > > > >>> > > > > paramCollector.collect(paramIterable.iterator().next()); > > > > > > >>> > > > } > > > > > > >>> > > > } > > > > > > >>> > > > }).print(); > > > > > > >>> > > > > > > > > > >>> > > > Regards, > > > > > > >>> > > > Vinay Patil > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil < > > > > > > >>> vinay18.patil@gmail.com> > > > > > > >>> > > > wrote: > > > > > > >>> > > > > > > > > > >>> > > > > You are right, debugged it for all elements , I can > do > > > that > > > > > > now. > > > > > > >>> > > > > Thanks a lot. > > > > > > >>> > > > > > > > > > > >>> > > > > Regards, > > > > > > >>> > > > > Vinay Patil > > > > > > >>> > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu < > > > > > > >>> > wuchong.wc@alibaba-inc.com> > > > > > > >>> > > > > wrote: > > > > > > >>> > > > > > > > > > > >>> > > > >> In `coGroup(Iterable iter1, > Iterable > > > > > iter2, > > > > > > >>> > > > >> Collector out)` , when both iter1 and > iter2 > > > are > > > > > not > > > > > > >>> > empty, > > > > > > >>> > > it > > > > > > >>> > > > >> means they are matched elements from both stream. > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it means th= at > > > they > > > > > are > > > > > > >>> > > unmatched. > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > > >>> > > > >> - Jark Wu (wuchong) > > > > > > >>> > > > >> > > > > > > >>> > > > >> > =E5=9C=A8 2016=E5=B9=B46=E6=9C=8814=E6=97=A5=EF= =BC=8C=E4=B8=8B=E5=8D=8812:46=EF=BC=8CVinay Patil < > > > > vinay18.patil@gmail.com > > > > > > > > > > > > >>> =E5=86=99=E9=81=93=EF=BC=9A > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > Hi Matthias , > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > I did not get you, even if we use Co-Group we ha= ve > > to > > > > > apply > > > > > > >>> it on > > > > > > >>> > a > > > > > > >>> > > > key > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream) > > > > > > >>> > > > >> > .where(new ElementSelector()) > > > > > > >>> > > > >> > .equalTo(new ElementSelector()) > > > > > > >>> > > > >> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > > > > > >>> > > > >> > .apply(new CoGroupFunction > > Integer>() > > > > { > > > > > > >>> > > > >> > private static final long serialVersionUID =3D > > > > > > >>> 6408179761497497475L; > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > @Override > > > > > > >>> > > > >> > public void coGroup(Iterable > paramIterable, > > > > > > >>> > > Iterable > > > > > > >>> > > > >> > paramIterable1, > > > > > > >>> > > > >> > Collector paramCollector) throws > Exception > > { > > > > > > >>> > > > >> > Iterator iterator =3D > > paramIterable.iterator(); > > > > > > >>> > > > >> > while(iterator.hasNext()) { > > > > > > >>> > > > >> > } > > > > > > >>> > > > >> > } > > > > > > >>> > > > >> > }); > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > when I debug this ,only the matched element from > > both > > > > > stream > > > > > > >>> will > > > > > > >>> > > come > > > > > > >>> > > > >> in > > > > > > >>> > > > >> > the coGroup function. > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > What I want is how do I check for unmatched > elements > > > > from > > > > > > both > > > > > > >>> > > streams > > > > > > >>> > > > >> and > > > > > > >>> > > > >> > write it to sink. > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > Regards, > > > > > > >>> > > > >> > Vinay Patil > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > *+91-800-728-4749* > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax= < > > > > > > >>> > mjsax@apache.org> > > > > > > >>> > > > >> wrote: > > > > > > >>> > > > >> > > > > > > > >>> > > > >> >> You need to do an outer-join. However, there is > no > > > > > build-in > > > > > > >>> > support > > > > > > >>> > > > for > > > > > > >>> > > > >> >> outer-joins yet. > > > > > > >>> > > > >> >> > > > > > > >>> > > > >> >> You can use Window-CoGroup to implement the > > > outer-join > > > > as > > > > > > an > > > > > > >>> own > > > > > > >>> > > > >> operator. > > > > > > >>> > > > >> >> > > > > > > >>> > > > >> >> > > > > > > >>> > > > >> >> -Matthias > > > > > > >>> > > > >> >> > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote: > > > > > > >>> > > > >> >>> Hi, > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> I have a question regarding the join operation= , > > > > consider > > > > > > the > > > > > > >>> > > > following > > > > > > >>> > > > >> >>> dummy example: > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =3D > > > > > > >>> > > > >> >>> > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > >>> > > > >> >>> > > > > > > >>> > > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > > > > > > >>> > > > >> >>> DataStreamSource sourceStream =3D > > > > > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18); > > > > > > >>> > > > >> >>> DataStreamSource destStream =3D > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10); > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> sourceStream.join(destStream) > > > > > > >>> > > > >> >>> .where(new ElementSelector()) > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector()) > > > > > > >>> > > > >> >>> > > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > > > > > > >>> > > > >> >>> .apply(new JoinFunction > > Integer>() { > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> private static final long serialVersionUID =3D= 1L; > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> @Override > > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer > > > > paramIN2) > > > > > > >>> throws > > > > > > >>> > > > >> Exception > > > > > > >>> > > > >> >> { > > > > > > >>> > > > >> >>> return paramIN1; > > > > > > >>> > > > >> >>> } > > > > > > >>> > > > >> >>> }).print(); > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> I perfectly get the elements that are matching > in > > > both > > > > > the > > > > > > >>> > > streams, > > > > > > >>> > > > >> >> however > > > > > > >>> > > > >> >>> my requirement is to write these matched > elements > > > and > > > > > also > > > > > > >>> the > > > > > > >>> > > > >> unmatched > > > > > > >>> > > > >> >>> elements to sink(S3) > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> How do I get the unmatched elements from each > > > stream ? > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >>> Regards, > > > > > > >>> > > > >> >>> Vinay Patil > > > > > > >>> > > > >> >>> > > > > > > >>> > > > >> >> > > > > > > >>> > > > >> >> > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > --001a11c007d8ca655b053668ea25--