flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2585) KafkaSource not working
Date Thu, 27 Aug 2015 15:14:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14716825#comment-14716825
] 

Aljoscha Krettek commented on FLINK-2585:
-----------------------------------------

Hi,
could you please give some details about the versions you are using. Especially the version
of Flink and Kafka would be interesting.

I tried your example on Flink 0.10-SNAPSHOT and Kafka 2.10-0.8.2.1 and it worked so we'll
have to try and find out together what could be the problem.

> KafkaSource not working
> -----------------------
>
>                 Key: FLINK-2585
>                 URL: https://issues.apache.org/jira/browse/FLINK-2585
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Boyang Jerry Peng
>
> I tried running the KafkaConsumerExample with that is subscribing to a console producer
of kafka but the KafkaConsumerExample topology was not receiving any data from Kafka.  Then
I wrote my own topology that uses Kafka as a source but it didn't work as well.  The topologies
would run but receive not data.  If I run a console consumer that subscibes to the topic of
the console producer, the console consumer receives data from the producer which indicates
the producer is working correctly.  Can someone help me with this problem?  
> Kafka console producer I am running:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> The flink code I am running:
> {code}
> public class KafkaDataProcessor {
> 	private static int port;
> 	private static String hostname;
> 	private static String topic;
> 	private static final Logger LOG = LoggerFactory.getLogger(KafkaDataProcessor.class);
> 	public static void main(String[] args) {
> 		if (!parseParameters(args)) {
> 			return;
> 		}
> 		System.out.println("Start listening for data on: " + hostname + ":" + port + " for
topic: " + topic);
> 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		DataStream<Tuple2<String, Integer>> dataStream = env
> 						.addSource(new KafkaSource(hostname + ":" + port, topic, "test-consumer-group",
new SimpleStringSchema(), 200L), "Kafka source").setParallelism(2)
> 										.flatMap(new Splitter()).setParallelism(2)
> 										.groupBy(0)
> 										.sum(1).setParallelism(2);
> 		dataStream.print().setParallelism(2);
> 		try {
> 			env.execute("kafka processor");
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
> 	public static class Splitter implements FlatMapFunction<String, Tuple2<String,
Integer>> {
> 		@Override
> 		public void flatMap(String sentence, Collector<Tuple2<String, Integer>>
out) throws Exception {
> 			for (String word : sentence.split(" ")) {
> 				System.out.println("word: " + word);
> 				LOG.info("word: {}", word);
> 				out.collect(new Tuple2<String, Integer>(word, 1));
> 			}
> 		}
> 	}
> 	private static boolean parseParameters(String[] args) {
> 		if (args.length > 0) {
> 			if (args.length == 3) {
> 				hostname = args[0];
> 				port = Integer.parseInt(args[1]);
> 				topic = args[2];
> 			} else {
> 				System.err.println("Usage: KafkaDataProcessor <hostname> <Port> <topic>");
> 				return false;
> 			}
> 		} else {
> 			System.out.println("Executing KafkaDataProcessor example with built-in default data.");
> 			System.out.println("  Provide Hostname and Port to read input data from.");
> 			System.out.println("  Usage: KafkaDataProcessor <Hostname> <Port> <topic>");
> 			return false;
> 		}
> 		return true;
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message