Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 07F31200C39 for ; Thu, 16 Mar 2017 16:00:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 06844160B7A; Thu, 16 Mar 2017 15:00:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 432F3160B78 for ; Thu, 16 Mar 2017 16:00:08 +0100 (CET) Received: (qmail 90373 invoked by uid 500); 16 Mar 2017 15:00:05 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 90363 invoked by uid 99); 16 Mar 2017 15:00:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Mar 2017 15:00:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8AE281A06C5 for ; Thu, 16 Mar 2017 15:00:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.399 X-Spam-Level: *** X-Spam-Status: No, score=3.399 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, MANY_SPAN_IN_TEXT=1, MIME_QP_LONG_LINE=0.001, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id i3g46ReoEso6 for ; Thu, 16 Mar 2017 15:00:00 +0000 (UTC) Received: from mail-pf0-f181.google.com (mail-pf0-f181.google.com [209.85.192.181]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id E1E4C5FD3B for ; Thu, 16 Mar 2017 14:59:59 +0000 (UTC) Received: by mail-pf0-f181.google.com with SMTP id p189so2383317pfp.1 for ; Thu, 16 Mar 2017 07:59:59 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:content-transfer-encoding:mime-version:date:subject:message-id :references:in-reply-to:to; bh=tf8etoNRIuqGzoQHyqXnVzOtt7W4iacnZHwH/nmD5y4=; b=YlG+UuENoGgpEzo1FmA6yHFnSaihsJW5UA4nL4Sw/rT3uHMLQKxMw484huCjtjOQDS V24D13EdH7hcq7/NnqLoTUNmV+J1Ne6AKRNvggmxgB7+uj0e6jsgDoJYGaQXk+WN6Rxu tCuMYk7zb0j/+XOyXLL9cfH/XWYH4wwnQ3EU44AKYBDMXL5QMKqzAd93Qj+Ig+nQ0WBS LclBUnb3t1Ch7ktkki/F4ttLJ3NFKhsMlkGo6U/7WfAzeD4aRM3wDXv5gklcdIbWXkJu zz5XFkWf/Uh/lQzbrNOPT+V+cIvBsOdmSZaiJDQaXv7kvCQZzHiOv0Y4ARVwLkZ23mwz ICWw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:content-transfer-encoding:mime-version:date :subject:message-id:references:in-reply-to:to; bh=tf8etoNRIuqGzoQHyqXnVzOtt7W4iacnZHwH/nmD5y4=; b=hMbTdbsyzTqMprliC3FR2Sog6iUs44lrPTH6t1+f8pXqVOPdHxAvAO2+Dn9TTWzwI9 bZTLJg2MIKIGfJCLats/scRF1+s5E1eZTckrH53mp/tisZp9BawyHkIOfeVSDXEWDlrP mf6bDs60dr49o3FlkJTjattdPXbGcJ45HshyUR7fT29SBHLBLKPk9sFI6aTLFeezdZYV xKieQZrf6UOw0yYUxQovEffX4lV1BMVlTen0fquse2l/KS0lt8TwDHMX3WvMohwIxqjx 96iw5tPg6IV77nbneVSjkUPVlk4K6+O34fRjn9gvkemSRbF3V811QG2h3sE5GH4Sdf5j gbNw== X-Gm-Message-State: AFeK/H0tN4C4r0lPQrx95DEGX0XfTc0JyC+oYHEsD11acZx4RO1jc0MgHmSjM5GOfFQqwg== X-Received: by 10.84.228.201 with SMTP id y9mr12826013pli.42.1489674861279; Thu, 16 Mar 2017 07:34:21 -0700 (PDT) Received: from [192.168.1.8] (c-67-164-89-35.hsd1.ca.comcast.net. [67.164.89.35]) by smtp.gmail.com with ESMTPSA id r17sm11180726pgg.19.2017.03.16.07.34.20 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 16 Mar 2017 07:34:20 -0700 (PDT) From: Tarandeep Singh Content-Type: multipart/alternative; boundary=Apple-Mail-FBA500C1-56B1-4C71-B052-FB9CD30B7A82 Content-Transfer-Encoding: 7bit Mime-Version: 1.0 (1.0) Date: Thu, 16 Mar 2017 07:34:19 -0700 Subject: Re: Data+control stream from kafka + window function - not working Message-Id: <682BD291-9ED3-486D-9C26-DACA73232346@gmail.com> References: In-Reply-To: To: user@flink.apache.org X-Mailer: iPhone Mail (14B100) archived-at: Thu, 16 Mar 2017 15:00:10 -0000 --Apple-Mail-FBA500C1-56B1-4C71-B052-FB9CD30B7A82 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable Data is read from Kafka and yes I use different group id every time I run th= e code. I have put break points and print statements to verify that. Also, if I don't connect with control stream the window function works.=20 - Tarandeep > On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai wro= te: >=20 > Hi Tarandeep, >=20 > I haven=E2=80=99t looked at the rest of the code yet, but my first guess i= s that you might not be reading any data from Kafka at all: >=20 >> private static DataStream readKafkaStream(String topic, StreamExe= cutionEnvironment env) throws IOException { >>=20 >> Properties properties =3D new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("zookeeper.connect", "localhost:2181"); >> properties.setProperty("group.id", "group-0009"); >> properties.setProperty("auto.offset.reset", "smallest"); >> return env.addSource(new FlinkKafkaConsumer08<>(topic, new Simple= StringSchema(), properties)); >> } >=20 >=20 > Have you tried using a different =E2=80=9Cgroup.id=E2=80=9D everytime you=E2= =80=99re re-running the job? > Note that the =E2=80=9Cauto.offset.reset=E2=80=9D value is only respected w= hen there aren=E2=80=99t any offsets for the group committed in Kafka. > So you might not actually be reading the complete =E2=80=9Csmall_input.cv=E2= =80=9D dataset, unless you use a different group.id overtime. >=20 > Cheers, > Gordon >=20 >> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com) wr= ote: >>=20 >> Hi, >>=20 >> I am using flink-1.2 and reading data stream from Kafka (using FlinkKafka= Consumer08). I want to connect this data stream with another stream (read co= ntrol stream) so as to do some filtering on the fly. After filtering, I am a= pplying window function (tumbling/sliding event window) along with fold func= tion. However, the window function does not get called. >>=20 >> Any help to debug/fix this is greatly appreciated! >>=20 >> Below is a reproducible code that one can run in IDE like IntelliJ or on f= link cluster. You will need to have a running Kafka cluster (local or otherw= ise). >> Create a topic and add test data points- >>=20 >> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localho= st:2181 --replication-factor 1 --partitions 1 >> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --= topic test < small_input.csv >>=20 >> where small_input.csv contains the following lines- >>=20 >> p1,10.0f,2017-03-14 16:01:01 >> p1,10.0f,2017-03-14 16:01:02 >> p1,10.0f,2017-03-14 16:01:03 >> p1,10.0f,2017-03-14 16:01:04 >> p1,10.0f,2017-03-14 16:01:05 >> p1,10.0f,2017-03-14 16:01:10 >> p1,10.0f,2017-03-14 16:01:11 >> p1,10.0f,2017-03-14 16:01:12 >> p1,10.0f,2017-03-14 16:01:40 >> p1,10.0f,2017-03-14 16:01:50 >>=20 >> Now you can run the code given below. Note: >>=20 >> 1) In this example, I am not reading control stream from Kafka (but issue= can be reproduced with this code as well) >> 2) If instead of reading data stream from kafka, I create stream from ele= ments (i.e. use getInput function instead of getKafkaInput function), the co= de works and window function is fired. >>=20 >> Thanks, >> Tarandeep >>=20 >>=20 >>=20 >> import org.apache.flink.api.common.functions.FoldFunction; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.api.java.tuple.Tuple; >> import org.apache.flink.api.java.tuple.Tuple1; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.api.java.tuple.Tuple3; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironm= ent; >> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;= >> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOr= dernessTimestampExtractor; >> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunct= ion; >> import org.apache.flink.streaming.api.operators.AbstractStreamOperator; >> import org.apache.flink.streaming.api.operators.OneInputStreamOperator; >> import org.apache.flink.streaming.api.watermark.Watermark; >> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTi= meWindows; >> import org.apache.flink.streaming.api.windowing.time.Time; >> import org.apache.flink.streaming.api.windowing.windows.TimeWindow; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; >> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> import org.apache.flink.util.Collector; >>=20 >> import java.io.IOException; >> import java.text.DateFormat; >> import java.text.SimpleDateFormat; >> import java.util.*; >>=20 >> public class Test3 { >>=20 >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env =3D StreamExecutionEnvironment.get= ExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>=20 >> //DataStream product =3D getInput(env); >> DataStream product =3D getKafkaInput(env); >> DataStream> control=3D getControl(env); >>=20 >> DataStream filteredStream =3D product.keyBy(0) >> .connect(control.keyBy(0)) >> .flatMap(new CoFlatMapFunImpl()); >>=20 >> DataStream watermarkedStream =3D filteredStream.assignTi= mestampsAndWatermarks( >> getTimestampAssigner(Time.seconds(1))).setParallelism(3);= >>=20 >> watermarkedStream.transform("WatermarkDebugger", watermarkedStrea= m.getType(), new WatermarkDebugger()); >>=20 >> watermarkedStream >> .keyBy(0) >> .window(TumblingEventTimeWindows.of(Time.seconds(5))) >> .fold(new NameCount("", 0), new FoldFunImpl(), new Window= FunImpl()) >> .print(); >>=20 >> env.execute(); >> } >>=20 >> /** >> * If instead of reading from Kafka, create stream from elements, the= >> * code works and window function is fired! >> */ >> private static DataStream getInput(StreamExecutionEnvironmen= t env) { >> return env.fromCollection(Arrays.asList( >> new Product("p1",10.0f,"2017-03-14 16:01:01"), >> new Product("p1",10.0f,"2017-03-14 16:01:02"), >> new Product("p1",10.0f,"2017-03-14 16:01:03"), >> new Product("p1",10.0f,"2017-03-14 16:01:04"), >> new Product("p1",10.0f,"2017-03-14 16:01:05"), >> new Product("p1",10.0f,"2017-03-14 16:01:10"), >> new Product("p1",10.0f,"2017-03-14 16:01:11"), >> new Product("p1",10.0f,"2017-03-14 16:01:12"), >> new Product("p1",10.0f,"2017-03-14 16:01:40"), >> new Product("p1",10.0f,"2017-03-14 16:01:50") >> )); >> } >>=20 >> private static DataStream getKafkaInput(StreamExecutionEnvir= onment env) throws IOException { >> DataStream s =3D readKafkaStream("test", env); >>=20 >> return s.map(new MapFunction() { >> @Override >> public Product map(String s) throws Exception { >> String[] fields =3D s.split(","); >> return new Product(fields[0], Float.parseFloat(fields[1])= , fields[2]); >> } >> }); >> } >>=20 >> private static DataStream> getControl(StreamExecutionE= nvironment env) { >> return env.fromElements(new Tuple1<>("p1")); >> } >>=20 >> private static class CoFlatMapFunImpl extends RichCoFlatMapFunction,Product> { >>=20 >> private Set productNames =3D new HashSet<>(Arrays.asList(= "p1")); >>=20 >> @Override >> public void flatMap1(Product product, Collector collecto= r) throws Exception { >> if (productNames.contains(product.f0)) { >> collector.collect(product); >> System.out.println("Retaining product " + product + " in d= ata stream"); >> } >> } >>=20 >> @Override >> public void flatMap2(Tuple1 t, Collector collect= or) throws Exception { >> productNames.add(t.f0); >> System.out.println("Adding product to set:" + t.f0); >> } >> } >>=20 >> private static class FoldFunImpl implements FoldFunction { >> @Override >> public NameCount fold(NameCount current, Product p) throws Except= ion { >> current.f0 =3D p.f0; >> current.f1 +=3D 1; >> return current; >> } >> } >>=20 >> /** >> * WINDOW FUNCTION NEVER GETS CALLED. >> */ >> private static class WindowFunImpl extends RichWindowFunction { >> @Override >> public void apply(Tuple key, TimeWindow timeWindow, Iterable iterable, >> Collector collector) throws Exceptio= n { >> NameCount nc =3D iterable.iterator().next(); >> collector.collect(nc); >> System.out.println("WINDOW: start time: " + new Date(timeWind= ow.getStart()) + " " + nc); >> } >> } >>=20 >> private static BoundedOutOfOrdernessTimestampExtractor getTi= mestampAssigner(final Time maxOutOfOrderness) { >> final DateFormat dateFormat =3D new SimpleDateFormat("yyyy-MM-dd H= H:mm:ss"); >>=20 >> return new BoundedOutOfOrdernessTimestampExtractor(maxOu= tOfOrderness) { >> @Override >> public long extractTimestamp(Product p) { >> long ts =3D 0L; >> try { >> ts =3D dateFormat.parse(p.f2).getTime(); >> } catch (Exception e) {} >> return ts; >> } >> }; >> } >>=20 >> public static class Product extends Tuple3 { >> public Product() {} >> public Product(String name, Float price, String dateTime) { >> super(name, price, dateTime); >> } >> } >>=20 >> public static class NameCount extends Tuple2 { >> public NameCount() {} >> public NameCount(String name, Integer count) { >> super(name, count); >> } >> } >>=20 >> private static DataStream readKafkaStream(String topic, Strea= mExecutionEnvironment env) throws IOException { >>=20 >> Properties properties =3D new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("zookeeper.connect", "localhost:2181"); >> properties.setProperty("group.id", "group-0009"); >> properties.setProperty("auto.offset.reset", "smallest"); >> return env.addSource(new FlinkKafkaConsumer08<>(topic, new Simple= StringSchema(), properties)); >> } >>=20 >> public static class WatermarkDebugger >> extends AbstractStreamOperator implements OneInputStreamOp= erator { >> private static final long serialVersionUID =3D 1L; >>=20 >> @Override >> public void processElement(StreamRecord element) throws Except= ion { >> System.out.println("ELEMENT: " + element); >> output.collect(element); >> } >>=20 >> @Override >> public void processWatermark(Watermark mark) throws Exception { >> super.processWatermark(mark); >> System.out.println("WM: " + mark); >> } >> } >> } >>=20 >>=20 --Apple-Mail-FBA500C1-56B1-4C71-B052-FB9CD30B7A82 Content-Type: text/html; charset=utf-8 Content-Transfer-Encoding: quoted-printable
Data is read from Kafka and yes I use d= ifferent group id every time I run the code. I have put break points and pri= nt statements to verify that.

Also, if I don't connect with control s= tream the window function works. 
<= br>- Tarandeep

On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Ta= i <tzulitai@apache.org> wro= te:

Hi Tarandeep,

I haven=E2=80=99t looked at the rest of the code yet,= but my first guess is that you might not be reading any data from Kafka at a= ll:

=
private static DataStream<String> read=
KafkaStream(String topic, <=
/span>StreamExecutionEnvironment env) throws IOException {

Properties properties =3D new
Properties();
properties.setProperty("bootstrap.servers", "localhost:9092")= ;
properties.setProperty("zookeeper.connect", "localhost= :2181");
properties.setProperty("group.id<= /a>"
, "group-0009");
<= /span>properties.setProperty("auto= .offset.reset", "smallest");
= return env.addSource(, new SimpleStringSchema(), properties));=
}

Have you tried using a different =E2=80=9C= group.id=E2=80=9D everytime you=E2=80=99re re-running the job?
Note that= the =E2=80=9Cauto.offset.reset=E2=80=9D value is only respected when there a= ren=E2=80=99t any offsets for the group committed in Kafka.
So you= might not actually be reading the complete =E2=80=9Csmall_input.cv=E2=80=9D= dataset, unless you use a different group.id overtime.

=
Cheers,
Gordon

On March 16, 2017 at= 2:39:10 PM, Tarandeep Singh (tarande= ep@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08). I want to connect this data stream with another stream (read control stream) so as to do some filtering on the fly. After filtering, I am applying window function (tumbling/sliding event window) along with fold function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster. You will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics= .sh --create --topic test --zookeeper localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < small_input.csv

where small_input.csv conta= ins the following lines-<= br>
p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements (i.e. use getInput function instead of getKafkaInput func= tion), the code works and window function is fired.

Thanks,
Tarandeep


<=
br>import org.apache.fli=
nk.api.common.functions.FoldFunction;<=
br>import org.apache.fli=
nk.api.common.functions.MapFunction;import org.apache.flin=
k.api.java.tuple.Tuple;
import org.apache.flink.api.java.tu= ple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
im= port org.apache.flink.streaming.api.TimeCharacteristic;
imp= ort org.apache.flink.streaming.api.datastream.DataStream;
im= port org.apache.flink.streaming.api.environment.StreamExecutionEnviro= nment;
import org.apache.flink.streaming.api.functions.co.R= ichCoFlatMapFunction;
import org.apache.flink.streaming.api.= functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
impo= rt org.apache.flink.streaming.api.functions.windowing.RichWindowFunct= ion;
import org.apache.flink.streaming.api.operators.Abstra= ctStreamOperator;
import org.apache.flink.streaming.api.ope= rators.OneInputStreamOperator;
import org.apache.flink.stre= aming.api.watermark.Watermark;
import org.apache.flink.stre= aming.api.windowing.assigners.TumblingEventTimeWindows;
import
org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
im= port org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08= ;
import org.apache.flink.streaming.runtime.streamrecord.St= reamRecord;
import org.apache.flink.streaming.util.serializ= ation.SimpleStringSchema;
import
org.apache.flink.util.Coll= ector;

import java.io.IOException;
import
java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.= util.*;

public class Test3 {

public static void ma= in(String[] args) throws Exception {
StreamExecutionEnvironment env =3D StreamExecutionEn= vironment.getExecutionEnvironment()= ;
env.setStreamTimeCharacteristic(TimeCharacteristi= c.EventTime)= ;

= //DataStream<Product> product =3D= getInput(env);
DataStream<Product> product =3D = getKafkaInput(env);
<= span style=3D"color:rgb(204,120,50)">
DataStream<Tuple1<= String>> control=3D getControl(env);

<= /span>DataStream<Product> filteredStream =3D product.keyBy(0)
.connect(control.ke= yBy(0))
.fl= atMap(new CoFlatMapFunImpl());

Da= taStream<Product> water= markedStream =3D filteredStream.assignTimestampsAndWatermarks(
= getTimestampAssigner(T= ime.seconds(1))).setParallelism(3);

= watermarkedStr= eam.transform("WatermarkDebugge= r", watermarkedStream.getType(), new WatermarkDebugger<Product>());

watermarkedStream

= .keyBy(0)
= .window(TumblingEventTimeWindows.= of(Time.seconds(5)))
.fold(new NameCount("", 0), ne= w FoldFunImpl(), new Win= dowFunImpl())
.print();

env.execute();
}=

/**
* If instea= d of reading from Kafka, create stream from elements, the
* code works and window f= unction is fired!
*/
private static D= ataStream<Product> getInput(StreamExecutionEnvironment env) {
return env.fromCollection(Arrays.asList(
new Product("p1",10.0f,"2017-03-14 16:01:01"),
n= ew Product("p1",= 10.0f,"2017-03-14 16:01:02"),
new= Product("p1",10= .0f,"2017-03-14 16:01:03"),
new <= /span>Product("p1",10.0f= ,"2017-03-14 16:01:04"),
new Product("p1",10.0f,"2017-03-14 16:01:05"),
new Product("p1",10.0f,"2017-03-14 16:01:10"),
new P= roduct("p1",10.0f,"2017-03-14 16:01:11")= ,
new Pro= duct("p1",10.0f<= span style=3D"color:rgb(204,120,50)">,
"2017-03-14 16:01:12"),<= br> new Produ= ct("p1",10.0f,
"2017-03-14 16:01:40"), new Product= ("p1",10.0f,
"2017-03-14 16:01:50")
));
}
private static DataStream= <Product> getKafkaInput(= StreamExecutionEnvironment env) throws= IOException {
DataStream<String> s =3D readKafkaStream("test", env);

return = s.map(new MapFunction<String= , Product>() {
= @Override
public Product map(S= tring s) throws Exception {
= String[] fields =3D s.split(",");
return new Product(field= s[0], Float.parseFloat
(fields[1]), fields[= 2]);
}
});
}

private static = DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment env) {
return env.fromElements(new Tuple1<>("p1"));
}

private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product> {

private Set<String> productNames =3D new HashSet<>(Arrays.asL= ist("p1"));
@Override
public void flatMap1(Product product, Collector<Product> collecto= r) throws Exception {
= if (productNames.contains(product.f0)) {
collector.collect(product= );
System.out.println("Retaining product " + product + " in data stream");
<= /span> }
}=

@Override
public void flatM= ap2(Tuple1<String> t, Collector<Product> collector) throws Exception {
productNames.add(t.f0);
System.out.println("Adding product to set:" + t.f0);
}
}

private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
@Override
public Nam= eCount fold(NameCount current<= span style=3D"color:rgb(204,120,50)">,
Product p) throws Exception {
current.f0 =3D p.f0;
current.f1 +=3D 1= ;
return current;
}
}

/**
= * WINDOW FUNCTION NEVER GETS CALLED.
*/
pr= ivate static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tup= le,TimeWindow> {
<= span style=3D"color:rgb(187,181,41)">@Override
publ= ic void apply(Tuple key= , TimeWindow timeWindow, Iterable<NameCount> iterable,
Collector<NameCount> collect= or) throws Exception {
= NameCount nc =3D iterable.iterator().next();
collector.collect(nc);
= System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) + " " + nc);=
}
}
private static Bounde= dOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final Time maxOutOfOrderness) {
final DateFormat dateFormat =3D new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return new BoundedOutOfOrdernessTimestampE= xtractor<Product>(maxOutOfOrderness) {
@Override
public long extractTimestamp(Product p= ) {
long ts =3D= 0L;
= try {
ts =3D dateFormat.parse(p.f2).getTime();
} catch (Exception e) {}
return ts;
}
= };
}

public static class Product extends Tuple3<String,Float,String> {
= public Product() {}
public Product(String name, Float price, String dateTime) {
= super(name, price, dateTime);
}
}

public static class NameCount extends Tuple2<String,Integer> {
public NameCount() {}
public NameCount
(String name, Integer count) {
super(name, count);
}
}

private static DataStream<String>= ; readKafkaStream(String topic= , StreamExecutionEnvironment en= v) throws IOException {

= Properties properties =3D new <= /span>Properties();
properties.setProperty("bootstrap.servers", "localhost:= 9092");
properties.setProperty("zookeeper.connect", "localhost:2181"<= /span>);
properties.setProperty("group.id", "group-0009");
properties.setProperty("auto.offset.reset"
, "smalles= t");
return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(), properties));
}

= public static class Watermar= kDebugger<T>
= extends AbstractStreamOperat= or<T> implements OneInputStreamOperator<T, T> {
private static final long serialVersionUID =3D 1L
;

@Override
public void processElement(StreamRecord<T> element) throws Exception {
System.out.println("ELEMENT: " + element);
= output.collect(element);
}

@= Override
public void
processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
= System.o= ut.println("WM: " + mark= );
}
}
}


= --Apple-Mail-FBA500C1-56B1-4C71-B052-FB9CD30B7A82--