flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tarandeep Singh <tarand...@gmail.com>
Subject Re: Data+control stream from kafka + window function - not working
Date Thu, 16 Mar 2017 14:34:19 GMT
Data is read from Kafka and yes I use different group id every time I run the code. I have
put break points and print statements to verify that.

Also, if I don't connect with control stream the window function works. 

- Tarandeep

> On Mar 16, 2017, at 1:12 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
> 
> Hi Tarandeep,
> 
> I haven’t looked at the rest of the code yet, but my first guess is that you might
not be reading any data from Kafka at all:
> 
>> private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment
env) throws IOException {
>> 
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
properties));
>>     }
> 
> 
> Have you tried using a different “group.id” everytime you’re re-running the job?
> Note that the “auto.offset.reset” value is only respected when there aren’t any
offsets for the group committed in Kafka.
> So you might not actually be reading the complete “small_input.cv” dataset, unless
you use a different group.id overtime.
> 
> Cheers,
> Gordon
> 
>> On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com) wrote:
>> 
>> Hi,
>> 
>> I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08).
I want to connect this data stream with another stream (read control stream) so as to do some
filtering on the fly. After filtering, I am applying window function (tumbling/sliding event
window) along with fold function. However, the window function does not get called.
>> 
>> Any help to debug/fix this is greatly appreciated!
>> 
>> Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster.
You will need to have a running Kafka cluster (local or otherwise).
>> Create a topic and add test data points-
>> 
>> $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181
--replication-factor 1 --partitions 1
>> $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
< small_input.csv
>> 
>> where small_input.csv contains the following lines-
>> 
>> p1,10.0f,2017-03-14 16:01:01
>> p1,10.0f,2017-03-14 16:01:02
>> p1,10.0f,2017-03-14 16:01:03
>> p1,10.0f,2017-03-14 16:01:04
>> p1,10.0f,2017-03-14 16:01:05
>> p1,10.0f,2017-03-14 16:01:10
>> p1,10.0f,2017-03-14 16:01:11
>> p1,10.0f,2017-03-14 16:01:12
>> p1,10.0f,2017-03-14 16:01:40
>> p1,10.0f,2017-03-14 16:01:50
>> 
>> Now you can run the code given below. Note:
>> 
>> 1) In this example, I am not reading control stream from Kafka (but issue can be
reproduced with this code as well)
>> 2) If instead of reading data stream from kafka, I create stream from elements (i.e.
use getInput function instead of getKafkaInput function), the code works and window function
is fired.
>> 
>> Thanks,
>> Tarandeep
>> 
>> 
>> 
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple1;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
>> import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>> import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
>> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
>> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.*;
>> 
>> public class Test3 {
>> 
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>>         //DataStream<Product> product = getInput(env);
>>         DataStream<Product> product = getKafkaInput(env);
>>         DataStream<Tuple1<String>> control= getControl(env);
>> 
>>         DataStream<Product> filteredStream = product.keyBy(0)
>>                 .connect(control.keyBy(0))
>>                 .flatMap(new CoFlatMapFunImpl());
>> 
>>         DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
>>                 getTimestampAssigner(Time.seconds(1))).setParallelism(3);
>> 
>>         watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(),
new WatermarkDebugger<Product>());
>> 
>>         watermarkedStream
>>                 .keyBy(0)
>>                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>>                 .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
>>                 .print();
>> 
>>         env.execute();
>>     }
>> 
>>     /**
>>      * If instead of reading from Kafka, create stream from elements, the
>>      * code works and window function is fired!
>>      */
>>     private static DataStream<Product> getInput(StreamExecutionEnvironment
env) {
>>         return env.fromCollection(Arrays.asList(
>>             new Product("p1",10.0f,"2017-03-14 16:01:01"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:02"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:03"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:04"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:05"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:10"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:11"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:12"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:40"),
>>             new Product("p1",10.0f,"2017-03-14 16:01:50")
>>         ));
>>     }
>> 
>>     private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment
env) throws IOException {
>>         DataStream<String> s = readKafkaStream("test", env);
>> 
>>         return s.map(new MapFunction<String, Product>() {
>>             @Override
>>             public Product map(String s) throws Exception {
>>                 String[] fields = s.split(",");
>>                 return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
>>             }
>>         });
>>     }
>> 
>>     private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment
env) {
>>         return env.fromElements(new Tuple1<>("p1"));
>>     }
>> 
>>     private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product,
Tuple1<String>,Product> {
>> 
>>         private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));
>> 
>>         @Override
>>         public void flatMap1(Product product, Collector<Product> collector)
throws Exception {
>>             if (productNames.contains(product.f0)) {
>>                 collector.collect(product);
>>                 System.out.println("Retaining product " + product + " in data stream");
>>             }
>>         }
>> 
>>         @Override
>>         public void flatMap2(Tuple1<String> t, Collector<Product> collector)
throws Exception {
>>             productNames.add(t.f0);
>>             System.out.println("Adding product to set:" + t.f0);
>>         }
>>     }
>> 
>>     private static class FoldFunImpl implements FoldFunction<Product,NameCount>
{
>>         @Override
>>         public NameCount fold(NameCount current, Product p) throws Exception {
>>             current.f0 = p.f0;
>>             current.f1 += 1;
>>             return current;
>>         }
>>     }
>> 
>>     /**
>>      * WINDOW FUNCTION NEVER GETS CALLED.
>>      */
>>     private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow>
{
>>         @Override
>>         public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount>
iterable,
>>                           Collector<NameCount> collector) throws Exception
{
>>             NameCount nc = iterable.iterator().next();
>>             collector.collect(nc);
>>             System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart())
+ " " + nc);
>>         }
>>     }
>> 
>>     private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final
Time maxOutOfOrderness) {
>>         final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>> 
>>         return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness)
{
>>             @Override
>>             public long extractTimestamp(Product p) {
>>                 long ts = 0L;
>>                 try {
>>                     ts = dateFormat.parse(p.f2).getTime();
>>                 } catch (Exception e) {}
>>                 return ts;
>>             }
>>         };
>>     }
>> 
>>     public static class Product extends Tuple3<String,Float,String> {
>>         public Product() {}
>>         public Product(String name, Float price, String dateTime) {
>>             super(name, price, dateTime);
>>         }
>>     }
>> 
>>     public static class NameCount extends Tuple2<String,Integer> {
>>         public NameCount() {}
>>         public NameCount(String name, Integer count) {
>>             super(name, count);
>>         }
>>     }
>> 
>>     private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment
env) throws IOException {
>> 
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "group-0009");
>>         properties.setProperty("auto.offset.reset", "smallest");
>>         return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
properties));
>>     }
>> 
>>     public static class WatermarkDebugger<T>
>>             extends AbstractStreamOperator<T> implements OneInputStreamOperator<T,
T> {
>>         private static final long serialVersionUID = 1L;
>> 
>>         @Override
>>         public void processElement(StreamRecord<T> element) throws Exception
{
>>             System.out.println("ELEMENT: " + element);
>>             output.collect(element);
>>         }
>> 
>>         @Override
>>         public void processWatermark(Watermark mark) throws Exception {
>>             super.processWatermark(mark);
>>             System.out.println("WM: " + mark);
>>         }
>>     }
>> }
>> 
>> 

Mime
View raw message