flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tarandeep Singh <tarand...@gmail.com>
Subject Data+control stream from kafka + window function - not working
Date Thu, 16 Mar 2017 06:30:53 GMT

I am using flink-1.2 and reading data stream from Kafka (using
FlinkKafkaConsumer08). I want to connect this data stream with another
stream (read control stream) so as to do some filtering on the fly. After
filtering, I am applying window function (tumbling/sliding event window)
along with fold function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on
flink cluster. You will need to have a running Kafka cluster (local or
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper
localhost:2181 --replication-factor 1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092
--topic test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue
can be reproduced with this code as well)
2) If instead of reading data stream from kafka, I create stream from
elements (i.e. use getInput function instead of getKafkaInput function),
the code works and window function is fired.


import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =

        //DataStream<Product> product = getInput(env);
        DataStream<Product> product = getKafkaInput(env);
        DataStream<Tuple1<String>> control= getControl(env);

        DataStream<Product> filteredStream = product.keyBy(0)
                .flatMap(new CoFlatMapFunImpl());

        DataStream<Product> watermarkedStream =

watermarkedStream.getType(), new WatermarkDebugger<Product>());

                .fold(new NameCount("", 0), new FoldFunImpl(), new


     * If instead of reading from Kafka, create stream from elements, the
     * code works and window function is fired!
    private static DataStream<Product>
getInput(StreamExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(
            new Product("p1",10.0f,"2017-03-14 16:01:01"),
            new Product("p1",10.0f,"2017-03-14 16:01:02"),
            new Product("p1",10.0f,"2017-03-14 16:01:03"),
            new Product("p1",10.0f,"2017-03-14 16:01:04"),
            new Product("p1",10.0f,"2017-03-14 16:01:05"),
            new Product("p1",10.0f,"2017-03-14 16:01:10"),
            new Product("p1",10.0f,"2017-03-14 16:01:11"),
            new Product("p1",10.0f,"2017-03-14 16:01:12"),
            new Product("p1",10.0f,"2017-03-14 16:01:40"),
            new Product("p1",10.0f,"2017-03-14 16:01:50")

    private static DataStream<Product>
getKafkaInput(StreamExecutionEnvironment env) throws IOException {
        DataStream<String> s = readKafkaStream("test", env);

        return s.map(new MapFunction<String, Product>() {
            public Product map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Product(fields[0],
Float.parseFloat(fields[1]), fields[2]);

    private static DataStream<Tuple1<String>>
getControl(StreamExecutionEnvironment env) {
        return env.fromElements(new Tuple1<>("p1"));

    private static class CoFlatMapFunImpl extends
RichCoFlatMapFunction<Product, Tuple1<String>,Product> {

        private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));

        public void flatMap1(Product product, Collector<Product>
collector) throws Exception {
            if (productNames.contains(product.f0)) {
                System.out.println("Retaining product " + product + "
in data stream");

        public void flatMap2(Tuple1<String> t, Collector<Product>
collector) throws Exception {
            System.out.println("Adding product to set:" + t.f0);

    private static class FoldFunImpl implements
FoldFunction<Product,NameCount> {
        public NameCount fold(NameCount current, Product p) throws Exception {
            current.f0 = p.f0;
            current.f1 += 1;
            return current;

    private static class WindowFunImpl extends
RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow> {
        public void apply(Tuple key, TimeWindow timeWindow,
Iterable<NameCount> iterable,
                          Collector<NameCount> collector) throws Exception {
            NameCount nc = iterable.iterator().next();
            System.out.println("WINDOW: start time: " + new
Date(timeWindow.getStart()) + " " + nc);

    private static BoundedOutOfOrdernessTimestampExtractor<Product>
getTimestampAssigner(final Time maxOutOfOrderness) {
        final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd

        return new
BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness) {
            public long extractTimestamp(Product p) {
                long ts = 0L;
                try {
                    ts = dateFormat.parse(p.f2).getTime();
                } catch (Exception e) {}
                return ts;

    public static class Product extends Tuple3<String,Float,String> {
        public Product() {}
        public Product(String name, Float price, String dateTime) {
            super(name, price, dateTime);

    public static class NameCount extends Tuple2<String,Integer> {
        public NameCount() {}
        public NameCount(String name, Integer count) {
            super(name, count);

    private static DataStream<String> readKafkaStream(String topic,
StreamExecutionEnvironment env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new
SimpleStringSchema(), properties));

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements
OneInputStreamOperator<T, T> {
        private static final long serialVersionUID = 1L;

        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);

        public void processWatermark(Watermark mark) throws Exception {
            System.out.println("WM: " + mark);

View raw message