flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Data+control stream from kafka + window function - not working
Date Thu, 16 Mar 2017 08:12:00 GMT
Hi Tarandeep,

I haven’t looked at the rest of the code yet, but my first guess is that you might not be
reading any data from Kafka at all:

private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment
env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
properties));
    }

Have you tried using a different “group.id” everytime you’re re-running the job?
Note that the “auto.offset.reset” value is only respected when there aren’t any offsets
for the group committed in Kafka.
So you might not actually be reading the complete “small_input.cv” dataset, unless you
use a different group.id overtime.

Cheers,
Gordon

On March 16, 2017 at 2:39:10 PM, Tarandeep Singh (tarandeep@gmail.com) wrote:

Hi,

I am using flink-1.2 and reading data stream from Kafka (using FlinkKafkaConsumer08). I want
to connect this data stream with another stream (read control stream) so as to do some filtering
on the fly. After filtering, I am applying window function (tumbling/sliding event window)
along with fold function. However, the window function does not get called.

Any help to debug/fix this is greatly appreciated!

Below is a reproducible code that one can run in IDE like IntelliJ or on flink cluster. You
will need to have a running Kafka cluster (local or otherwise).
Create a topic and add test data points-

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper localhost:2181 --replication-factor
1 --partitions 1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < small_input.csv

where small_input.csv contains the following lines-

p1,10.0f,2017-03-14 16:01:01
p1,10.0f,2017-03-14 16:01:02
p1,10.0f,2017-03-14 16:01:03
p1,10.0f,2017-03-14 16:01:04
p1,10.0f,2017-03-14 16:01:05
p1,10.0f,2017-03-14 16:01:10
p1,10.0f,2017-03-14 16:01:11
p1,10.0f,2017-03-14 16:01:12
p1,10.0f,2017-03-14 16:01:40
p1,10.0f,2017-03-14 16:01:50

Now you can run the code given below. Note:

1) In this example, I am not reading control stream from Kafka (but issue can be reproduced
with this code as well)
2) If instead of reading data stream from kafka, I create stream from elements (i.e. use getInput
function instead of getKafkaInput function), the code works and window function is fired.

Thanks,
Tarandeep



import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

public class Test3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //DataStream<Product> product = getInput(env);
        DataStream<Product> product = getKafkaInput(env);
        DataStream<Tuple1<String>> control= getControl(env);

        DataStream<Product> filteredStream = product.keyBy(0)
                .connect(control.keyBy(0))
                .flatMap(new CoFlatMapFunImpl());

        DataStream<Product> watermarkedStream = filteredStream.assignTimestampsAndWatermarks(
                getTimestampAssigner(Time.seconds(1))).setParallelism(3);

        watermarkedStream.transform("WatermarkDebugger", watermarkedStream.getType(), new
WatermarkDebugger<Product>());

        watermarkedStream
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .fold(new NameCount("", 0), new FoldFunImpl(), new WindowFunImpl())
                .print();

        env.execute();
    }

    /**
     * If instead of reading from Kafka, create stream from elements, the
     * code works and window function is fired!
     */
    private static DataStream<Product> getInput(StreamExecutionEnvironment env) {
        return env.fromCollection(Arrays.asList(
            new Product("p1",10.0f,"2017-03-14 16:01:01"),
            new Product("p1",10.0f,"2017-03-14 16:01:02"),
            new Product("p1",10.0f,"2017-03-14 16:01:03"),
            new Product("p1",10.0f,"2017-03-14 16:01:04"),
            new Product("p1",10.0f,"2017-03-14 16:01:05"),
            new Product("p1",10.0f,"2017-03-14 16:01:10"),
            new Product("p1",10.0f,"2017-03-14 16:01:11"),
            new Product("p1",10.0f,"2017-03-14 16:01:12"),
            new Product("p1",10.0f,"2017-03-14 16:01:40"),
            new Product("p1",10.0f,"2017-03-14 16:01:50")
        ));
    }

    private static DataStream<Product> getKafkaInput(StreamExecutionEnvironment env)
throws IOException {
        DataStream<String> s = readKafkaStream("test", env);

        return s.map(new MapFunction<String, Product>() {
            @Override
            public Product map(String s) throws Exception {
                String[] fields = s.split(",");
                return new Product(fields[0], Float.parseFloat(fields[1]), fields[2]);
            }
        });
    }

    private static DataStream<Tuple1<String>> getControl(StreamExecutionEnvironment
env) {
        return env.fromElements(new Tuple1<>("p1"));
    }

    private static class CoFlatMapFunImpl extends RichCoFlatMapFunction<Product, Tuple1<String>,Product>
{

        private Set<String> productNames = new HashSet<>(Arrays.asList("p1"));

        @Override
        public void flatMap1(Product product, Collector<Product> collector) throws Exception
{
            if (productNames.contains(product.f0)) {
                collector.collect(product);
                System.out.println("Retaining product " + product + " in data stream");
            }
        }

        @Override
        public void flatMap2(Tuple1<String> t, Collector<Product> collector) throws
Exception {
            productNames.add(t.f0);
            System.out.println("Adding product to set:" + t.f0);
        }
    }

    private static class FoldFunImpl implements FoldFunction<Product,NameCount> {
        @Override
        public NameCount fold(NameCount current, Product p) throws Exception {
            current.f0 = p.f0;
            current.f1 += 1;
            return current;
        }
    }

    /**
     * WINDOW FUNCTION NEVER GETS CALLED.
     */
    private static class WindowFunImpl extends RichWindowFunction<NameCount,NameCount,Tuple,TimeWindow>
{
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<NameCount> iterable,
                          Collector<NameCount> collector) throws Exception {
            NameCount nc = iterable.iterator().next();
            collector.collect(nc);
            System.out.println("WINDOW: start time: " + new Date(timeWindow.getStart()) +
" " + nc);
        }
    }

    private static BoundedOutOfOrdernessTimestampExtractor<Product> getTimestampAssigner(final
Time maxOutOfOrderness) {
        final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        return new BoundedOutOfOrdernessTimestampExtractor<Product>(maxOutOfOrderness)
{
            @Override
            public long extractTimestamp(Product p) {
                long ts = 0L;
                try {
                    ts = dateFormat.parse(p.f2).getTime();
                } catch (Exception e) {}
                return ts;
            }
        };
    }

    public static class Product extends Tuple3<String,Float,String> {
        public Product() {}
        public Product(String name, Float price, String dateTime) {
            super(name, price, dateTime);
        }
    }

    public static class NameCount extends Tuple2<String,Integer> {
        public NameCount() {}
        public NameCount(String name, Integer count) {
            super(name, count);
        }
    }

    private static DataStream<String> readKafkaStream(String topic, StreamExecutionEnvironment
env) throws IOException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "group-0009");
        properties.setProperty("auto.offset.reset", "smallest");
        return env.addSource(new FlinkKafkaConsumer08<>(topic, new SimpleStringSchema(),
properties));
    }

    public static class WatermarkDebugger<T>
            extends AbstractStreamOperator<T> implements OneInputStreamOperator<T,
T> {
        private static final long serialVersionUID = 1L;

        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            System.out.println("ELEMENT: " + element);
            output.collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            super.processWatermark(mark);
            System.out.println("WM: " + mark);
        }
    }
}



Mime
View raw message