Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 765A419C36 for ; Tue, 5 Apr 2016 19:00:27 +0000 (UTC) Received: (qmail 54083 invoked by uid 500); 5 Apr 2016 19:00:26 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 53964 invoked by uid 500); 5 Apr 2016 19:00:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 53954 invoked by uid 99); 5 Apr 2016 19:00:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Apr 2016 19:00:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 66098180223 for ; Tue, 5 Apr 2016 19:00:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.722 X-Spam-Level: X-Spam-Status: No, score=-0.722 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 0U9T4wCaWvpt for ; Tue, 5 Apr 2016 19:00:24 +0000 (UTC) Received: from BAY004-OMC3S15.hotmail.com (bay004-omc3s15.hotmail.com [65.54.190.153]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id A06725F19B for ; Tue, 5 Apr 2016 19:00:23 +0000 (UTC) Received: from BAY182-W84 ([65.54.190.189]) by BAY004-OMC3S15.hotmail.com over TLS secured channel with Microsoft SMTPSVC(7.5.7601.23008); Tue, 5 Apr 2016 12:00:16 -0700 X-TMN: [cIp0XBk3uowYEh7TQH9AZCEfbHxKWykW5MkCf1Hahvc=] X-Originating-Email: [kanak.b@hotmail.com] Message-ID: From: Kanak Biscuitwala To: "user@flink.apache.org" Subject: RE: Multiple operations on a WindowedStream Date: Tue, 5 Apr 2016 12:00:15 -0700 Importance: Normal In-Reply-To: References: , Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-OriginalArrivalTime: 05 Apr 2016 19:00:16.0123 (UTC) FILETIME=[63EE64B0:01D18F6D] This worked when I ran my test code locally=2C but I'm seeing nothing reach= my sink when I try to run this in YARN (previously=2C when I just echo'ed = all sums to my sink=2C it would work).=0A= =0A= Here's what my code looks like:=0A= =0A= =A0 =A0 =A0 =A0 StreamExecutionEnvironment env =3D StreamExecutionEnvironme= nt.getExecutionEnvironment()=3B=0A= =A0 =A0 =A0 =A0 FlinkKafkaConsumer09 consumer =3D new= FlinkKafkaConsumer09<>(=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 INPUT_TOPIC=2C new KafkaMessageDeserializer= ()=2C properties)=3B=0A= =A0 =A0 =A0 =A0 env.enableCheckpointing(5000)=3B=0A= =0A= =A0 =A0 =A0 =A0 // this (or event time) is required in order to do the doub= le-windowing below=0A= =A0 =A0 =A0 =A0 env.setStreamTimeCharacteristic(TimeCharacteristic.Ingestio= nTime)=3B=0A= =0A= =A0 =A0 =A0 =A0 DataStream stream =3D env=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .addSource(consumer)=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .flatMap(new CountRequests())=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .keyBy(0=2C 1)=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .timeWindow(Time.of(1=2C TimeUnit.MINUTES)= =2C Time.of(5=2C TimeUnit.SECONDS))=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .sum(2)=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .timeWindowAll(Time.of(5=2C TimeUnit.SECOND= S))=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .apply(new TopK(20))=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 .map(new ToString>>())=3B=0A= =A0 =A0 =A0 =A0 stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC=2C n= ew SimpleStringSchema()=2C=0A= =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 properties))=3B=0A= =A0 =A0 =A0 =A0 env.execute(TASK_NAME)=3B=0A= =0A= Note that CountRequests produces Tuple3=2C Top= K is an AllWindowFunction that produces=A0List>=2C and ToString is a MapFunction that is just a wrapper on Object#= toString().=0A= =0A= Anything obvious that I'm doing wrong?=0A= ________________________________=0A= > From: aljoscha@apache.org =0A= > Date: Fri=2C 1 Apr 2016 09:41:12 +0000 =0A= > Subject: Re: Multiple operations on a WindowedStream =0A= > To: user@flink.apache.org =0A= > =0A= > Hi=2C =0A= > if you are using ingestion-time (or event-time) as your stream time =0A= > characteristic=2C i.e.: =0A= > =0A= > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or = =0A= > TimeCharacteristic.EventTime =0A= > =0A= > you can apply several window transforms after another and they will =0A= > apply the same "time window" because they work on the element =0A= > timestamps. What you can then do is have a window that does the =0A= > aggregation and then another one (that has to be global) to select the = =0A= > top elements: =0A= > =0A= > result =3D input =0A= > .keyBy() =0A= > .timeWindow(Time.minutes(1)=2C Time.seconds(5)) =0A= > .sum(2) =0A= > .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding =0A= > window here=2C because the above will output a new window every 5 seconds= =0A= > .apply() =0A= > =0A= > I hope this helps. =0A= > =0A= > Cheers=2C =0A= > Aljoscha =0A= > =0A= > On Fri=2C 1 Apr 2016 at 10:35 Balaji Rajagopalan =0A= > > = =0A= > wrote: =0A= > I had a similar use case and ended writing the aggregation logic in the = =0A= > apply function=2C could not find any better solution. =0A= > =0A= > On Fri=2C Apr 1=2C 2016 at 6:03 AM=2C Kanak Biscuitwala =0A= > > wrote: =0A= > Hi=2C =0A= > =0A= > I would like to write something that does something like a word count=2C = =0A= > and then emits only the 10 highest counts for that window. Logically=2C I= =0A= > would want to do something like: =0A= > =0A= > stream.timeWindow(Time.of(1=2C TimeUnit.MINUTES)=2C Time.of(5=2C =0A= > TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) =0A= > =0A= > However=2C the window context is lost after I do the sum aggregation. Is = =0A= > there a straightforward way to express this logic in Flink 1.0? One way = =0A= > I can think of is to have a complex function in apply() that has state=2C= =0A= > but I would like to know if there is something a little cleaner than =0A= > that. =0A= > =0A= > Thanks=2C =0A= > Kanak =0A= > =0A= =