Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 93A7B200BA7 for ; Fri, 21 Oct 2016 17:18:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8F7FC160AE8; Fri, 21 Oct 2016 15:18:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8AA68160AE0 for ; Fri, 21 Oct 2016 17:18:12 +0200 (CEST) Received: (qmail 94524 invoked by uid 500); 21 Oct 2016 15:18:11 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 94514 invoked by uid 99); 21 Oct 2016 15:18:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2016 15:18:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D6B651A0241 for ; Fri, 21 Oct 2016 15:18:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.579 X-Spam-Level: *** X-Spam-Status: No, score=3.579 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id CdQZEH6ltNG2 for ; Fri, 21 Oct 2016 15:18:08 +0000 (UTC) Received: from mail-lf0-f42.google.com (mail-lf0-f42.google.com [209.85.215.42]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id B8AC05F576 for ; Fri, 21 Oct 2016 15:18:07 +0000 (UTC) Received: by mail-lf0-f42.google.com with SMTP id l131so142825569lfl.2 for ; Fri, 21 Oct 2016 08:18:07 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=AkIZZz1anRT4R3LFkVEe80cxVIDogxWylLV9ql06CLM=; b=W9p8MVXLtgzCqVoL5EHRwhWhxQl368/v//PnyfNpOVEQCLBciUwMHOHy6RgLL1YG+J xg5+nQztExiQ+jHqgp5LYgULoCsNUDXVFIWNSgziIAFjzhFlP+OkkN4S4VGj8RSaWMBt 9/sWYJRdEUhRilgQPfNPe23RP0GvGj10syA2Lj1aQeJFBb7X7eBY5QTD/xiejjAZn/cx cHU95jf3UWeC8z7fs+3efEh7busN5PQ09+zP2tVmLo4ACmVX0R5nizN91a0KIjSH+p5b +1S9+TbcpVVPP+wZLcSMixzvsHvPbLnzz9+Z/TTZ71Ke23wSDyUzy47bov9EQfrOu9AH 1VGg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=AkIZZz1anRT4R3LFkVEe80cxVIDogxWylLV9ql06CLM=; b=CJ4EOLgQEl2/fhf1XhWYtUPRtqpLUmlb/aWqUi+VTfF8W6YQs8BZSWwkyAr+qnKdUz m9nt8LTxKd0UeMbAFU+J3hph0Hk7puoKRXGnCcbZuU3yxrnparx+io/kzttCvYeS8nBW ngHVi3H7BSFHhpsywrqEC/drzwkd3vXQwsdUyY4NUVWXAd/lQb/F24cIA3SzJBGZcuPC DaFOmBIxsj8WQwR9R9sp77WcwTJVXzXYLhWYsSYAYCcdAraDecAthUNOhcrd0Lgcxu5Q mfmWNTfMTGLBEacXNl5ZwYsprkiBztiqmoBNjLw3tB4bHb0VpbXzg2E4lv4D2pbxlxJg 4Fcg== X-Gm-Message-State: ABUngvd4F1hX3FMm4ZGuZxPvjLnjZdog1rA/vxMNfRsGJ15BatwBOco/dc7aH+w+M1E3RYycmhMjpGxix34ATQ== X-Received: by 10.25.215.34 with SMTP id o34mr873602lfg.35.1477063086259; Fri, 21 Oct 2016 08:18:06 -0700 (PDT) MIME-Version: 1.0 Received: by 10.25.133.7 with HTTP; Fri, 21 Oct 2016 08:17:35 -0700 (PDT) In-Reply-To: References: From: Fabian Hueske Date: Fri, 21 Oct 2016 17:17:35 +0200 Message-ID: Subject: Re: multiple processing of streams To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1140e5fe914501053f618c55 archived-at: Fri, 21 Oct 2016 15:18:13 -0000 --001a1140e5fe914501053f618c55 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Robert, it is certainly possible to feed the same DataStream into two (or more) operators. Both operators should then process the complete input stream. What you describe is an unintended behavior. Can you explain how you figure out that both window operators only receive half of the events? Thanks, Fabian 2016-10-19 18:28 GMT+02:00 : > Is it possible to process the same stream in two different ways? I can= =E2=80=99t > find anything in the documentation definitively stating this is possible, > but nor do I find anything stating it isn=E2=80=99t. My attempt had some > unexpected results, which I=E2=80=99ll explain below: > > > > Essentially, I have a stream of data I=E2=80=99m pulling from Kafka. I w= ant to > build aggregate metrics on this data set using both tumbling windows as > well as session windows. So, I do something like the following: > > > > DataStream baseStream =3D > > env.addSource(=E2=80=A6.); // pulling data fro= m kafka > > .map(=E2=80=A6) // parse the raw input > > .assignTimestampsAndWatermarks(=E2=80=A6); > > > > DataStream > timeWindowedStream =3D > > baseStream.keyBy(=E2=80=A6) > > .timeWindow(=E2=80=A6) // > tumbling window > > .apply(=E2=80=A6); = // > aggregation over tumbling window > > > > DataStream > sessionWindowedStream =3D > > baseStream.keyBy(=E2=80=A6) > > .window(EventTimeSessionWindows.wit= hGap(=E2=80=A6)) > // session window > > .apply(=E2=80=A6); > // > aggregation over session window > > > > The issue is that when I view my job in the Flink dashboard, it indicates > that each type of windowing is only receiving half of the records. Is wh= at > I=E2=80=99m trying simply unsupported or is there something I=E2=80=99m m= issing? > > > > Thanks! > > > > > > > > > > > > > > > > ------------------------------ > The information contained in this communication is confidential and > intended only for the use of the recipient named above, and may be legall= y > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified th= at > any dissemination, distribution or copying of this communication is > strictly prohibited. If you have received this communication in error, > please resend it to the sender and delete the original message and copy o= f > it from your computer system. Opinions, conclusions and other information > in this message that do not relate to our official business should be > understood as neither given nor endorsed by the company. > --001a1140e5fe914501053f618c55 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Robert,

it is certainly possible= to feed the same DataStream into two (or more) operators.
Bo= th operators should then process the complete input stream.

What yo= u describe is an unintended behavior.
Can you explain how you figure ou= t that both window operators only receive half of the events?

=
Thanks,
Fabian




2016-10-19 18:28 GMT= +02:00 <robert.lancaster@hyatt.com>:

Is it possible to p= rocess the same stream in two different ways?=C2=A0 I can=E2=80=99t find an= ything in the documentation definitively stating this is possible, but nor = do I find anything stating it isn=E2=80=99t.=C2=A0 My attempt had some unexpected results, which I=E2=80=99ll explain below:

=C2=A0

Essentially, I have= a stream of data I=E2=80=99m pulling from Kafka.=C2=A0 I want to build agg= regate metrics on this data set using both tumbling windows as well as sess= ion windows.=C2=A0 So, I do something like the following:

=C2=A0

DataStream<MyRec= ordType> baseStream =3D

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0env.addSource(=E2=80=A6.);=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 // pulling data from kafka=

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0.map(=E2=80=A6) =C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 // parse the raw input<= u>

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 = =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0.assignTimestampsAndWatermarks(=E2=80=A6);

=C2=A0

DataStream <Tupl= e..<=E2=80=A6>> timeWindowedStream =3D

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ba= seStream.keyBy(=E2=80=A6)

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .timeWindow(=E2=80= =A6)=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 // tumblin= g window

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .apply(=E2=80=A6);= =C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 // aggregation over tumbli= ng window

=C2=A0

DataStream <Tupl= e..<=E2=80=A6>> sessionWindowedStream =3D

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 ba= seStream.keyBy(=E2=80=A6)

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .window(EventTi= meSessionWindows.withGap(=E2=80=A6))=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 // session window

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .apply(=E2=80=A6);= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 // aggregation over session window

=C2=A0

The issue is that w= hen I view my job in the Flink dashboard, it indicates that each type of wi= ndowing is only receiving half of the records.=C2=A0 Is what I=E2=80=99m tr= ying simply unsupported or is there something I=E2=80=99m missing?

=C2=A0

Thanks!

=C2=A0

=C2=A0

=C2=A0

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0

=C2=A0

=C2=A0

=C2=A0



The information contained i= n this communication is confidential and intended only for the use of the r= ecipient named above, and may be legally privileged and exempt from disclos= ure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified tha= t any dissemination, distribution or copying of this communication is stric= tly prohibited. If you have received this communication in error, please re= send it to the sender and delete the original message and copy of it from your computer system. Opinions, c= onclusions and other information in this message that do not relate to our = official business should be understood as neither given nor endorsed by the= company.

--001a1140e5fe914501053f618c55--