Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5DB0E200AF7 for ; Tue, 14 Jun 2016 10:07:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5C2E3160A47; Tue, 14 Jun 2016 08:07:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7DDFA1602C5 for ; Tue, 14 Jun 2016 10:07:33 +0200 (CEST) Received: (qmail 97062 invoked by uid 500); 14 Jun 2016 08:07:32 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 97050 invoked by uid 99); 14 Jun 2016 08:07:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 08:07:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A0D91C2004 for ; Tue, 14 Jun 2016 08:07:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id gOx_-TKY77zQ for ; Tue, 14 Jun 2016 08:07:27 +0000 (UTC) Received: from mail-it0-f51.google.com (mail-it0-f51.google.com [209.85.214.51]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 744355F297 for ; Tue, 14 Jun 2016 08:07:26 +0000 (UTC) Received: by mail-it0-f51.google.com with SMTP id z189so70705957itg.0 for ; Tue, 14 Jun 2016 01:07:26 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=IxdDUcixVgzO6CYvR9o//obw+9QjhMqryb3DzWmuddM=; b=sxNPxxwHm40DPXG3caUYszxe7hMBDDTd+u72LJ1tmtZEd17kKvzwkQDQOrFwBGn6wd 29LLNLSt6l7BCzM8ecb0eX7HPDU4OUAf27R9leF4Op8cLATdYZEKxSGeB668xGKNNaUx qNMEwUCwLNyKbpxZNHwhhoN+HdeuLEV/ObNcy9OvvY2AvkduD857dYr66TZl0PyF+LSA uxM3DqFBBdCWak0m4piPYOZeHIaEbUnRGMs+geFCrVHZq3D04cLzRgxNh9blJVmVzfj+ UxnmxP1zHGknOKt/OSxEWiXJYGd/8rL4RYqg/jcdV6U1Vj+qb73uGstNrQ1qr3X/QKja sSRA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=IxdDUcixVgzO6CYvR9o//obw+9QjhMqryb3DzWmuddM=; b=j3eyICI9LGqzieJAbpmKAZ69z5B3aj0Xhur+/VinACOrra3r+3YAd/W/YZnv+WJJl+ sEeMH+59yYTT/uzMtOQoZuNDHis0A5KLFxRAE0c4N1Wl81gLJbgLgn1qQBcI/jXETp5D Cb3iKCCTBQDKjW+YWzN+ZVZVSuEJSvjQc2zLnWSSIVGnwr8VIaXFVVOf27lk44S7GHI/ ltOi9+yevJTp0o+bZapQ+5/VlmuFSSxHNBsL1+cuYK17xTTLGtJbRZ6enGwlEPlilikN 35qNOtK/Dot5UKwCnmVpIBGmCxIDmB6rnVGeWbtmiBhc/PHoAeV1WLNGBnncO1hfKwoB VRQQ== X-Gm-Message-State: ALyK8tL/Gs4JrMeTXIyQNn9VzN7hwV+aH20SkIKIS21DLESTb1KMbH0za1z6fBmkncdIeZk1PTvLLo9Od6HQ3Q== X-Received: by 10.36.22.134 with SMTP id a128mr5876119ita.58.1465891639108; Tue, 14 Jun 2016 01:07:19 -0700 (PDT) MIME-Version: 1.0 Received: by 10.36.24.75 with HTTP; Tue, 14 Jun 2016 01:07:18 -0700 (PDT) In-Reply-To: <26F1352E-3493-4AEA-BFF6-F7A99924E73C@alibaba-inc.com> References: <575F1989.8070003@apache.org> <26F1352E-3493-4AEA-BFF6-F7A99924E73C@alibaba-inc.com> From: Vinay Patil Date: Tue, 14 Jun 2016 13:37:18 +0530 Message-ID: Subject: Re: [Discussion] Query regarding Join To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=001a114457f66db8ce0535387e12 archived-at: Tue, 14 Jun 2016 08:07:34 -0000 --001a114457f66db8ce0535387e12 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable You are right, debugged it for all elements , I can do that now. Thanks a lot. Regards, Vinay Patil On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu wrote: > In `coGroup(Iterable iter1, Iterable iter2, > Collector out)` , when both iter1 and iter2 are not empty, it > means they are matched elements from both stream. > When one of iter1 and iter2 is empty , it means that they are unmatched. > > > - Jark Wu (wuchong) > > > =E5=9C=A8 2016=E5=B9=B46=E6=9C=8814=E6=97=A5=EF=BC=8C=E4=B8=8B=E5=8D=88= 12:46=EF=BC=8CVinay Patil =E5=86=99=E9=81=93=EF= =BC=9A > > > > Hi Matthias , > > > > I did not get you, even if we use Co-Group we have to apply it on a key > > > > sourceStream.coGroup(destStream) > > .where(new ElementSelector()) > > .equalTo(new ElementSelector()) > > .window(TumblingEventTimeWindows.of(Time.seconds(30))) > > .apply(new CoGroupFunction() { > > private static final long serialVersionUID =3D 6408179761497497475L; > > > > @Override > > public void coGroup(Iterable paramIterable, Iterable > > paramIterable1, > > Collector paramCollector) throws Exception { > > Iterator iterator =3D paramIterable.iterator(); > > while(iterator.hasNext()) { > > } > > } > > }); > > > > when I debug this ,only the matched element from both stream will come = in > > the coGroup function. > > > > What I want is how do I check for unmatched elements from both streams > and > > write it to sink. > > > > Regards, > > Vinay Patil > > > > *+91-800-728-4749* > > > > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax > wrote: > > > >> You need to do an outer-join. However, there is no build-in support fo= r > >> outer-joins yet. > >> > >> You can use Window-CoGroup to implement the outer-join as an own > operator. > >> > >> > >> -Matthias > >> > >> On 06/13/2016 06:53 PM, Vinay Patil wrote: > >>> Hi, > >>> > >>> I have a question regarding the join operation, consider the followin= g > >>> dummy example: > >>> > >>> StreamExecutionEnvironment env =3D > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > >>> DataStreamSource sourceStream =3D > >>> env.fromElements(10,20,23,25,30,33,102,18); > >>> DataStreamSource destStream =3D > >> env.fromElements(20,30,40,50,60,10); > >>> > >>> sourceStream.join(destStream) > >>> .where(new ElementSelector()) > >>> .equalTo(new ElementSelector()) > >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) > >>> .apply(new JoinFunction() { > >>> > >>> private static final long serialVersionUID =3D 1L; > >>> > >>> @Override > >>> public Integer join(Integer paramIN1, Integer paramIN2) throws > Exception > >> { > >>> return paramIN1; > >>> } > >>> }).print(); > >>> > >>> I perfectly get the elements that are matching in both the streams, > >> however > >>> my requirement is to write these matched elements and also the > unmatched > >>> elements to sink(S3) > >>> > >>> How do I get the unmatched elements from each stream ? > >>> > >>> Regards, > >>> Vinay Patil > >>> > >> > >> > > --001a114457f66db8ce0535387e12--