flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Anchlia <mohitanch...@gmail.com>
Subject Flink not reading from Kafka
Date Sat, 18 Feb 2017 00:54:54 GMT
I have this code trying to read from a topic however the flink process
comes up and waits forever even though there is data in the topic. Not sure
why? Has anyone else seen this problem?

StreamExecutionEnvironment env = StreamExecutionEnvironment

.*createLocalEnvironment*();

Properties properties = *new* Properties();

properties.setProperty("bootstrap.servers", "xxxx:9092");

properties.setProperty("group.id", "test1");

properties.setProperty("auto.offset.reset", "earliest");

FlatMapFunction<Integer, Tuple2<Integer, Integer>> flatMapper = //something



DataStream<String> stream = env

.addSource(*new* FlinkKafkaConsumer010<>("test", *new*
SimpleStringSchema(), properties));

stream.map(s -> Integer.*valueOf*(s)).flatMap(flatMapper).returns(

*new* TypeHint<Tuple2<Integer, Integer>>() {

}).print();

JobExecutionResult *res* = env.execute();



02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
Unnamed(4/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
Unnamed(1/4) switched to RUNNING


02/17/2017 16:50:25 Source: Custom Source -> Map -> Flat Map -> Sink:
Unnamed(2/4) switched to RUNNING

Mime
View raw message