Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB74718368 for ; Fri, 12 Feb 2016 14:03:59 +0000 (UTC) Received: (qmail 26308 invoked by uid 500); 12 Feb 2016 14:03:59 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 26268 invoked by uid 500); 12 Feb 2016 14:03:59 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 26259 invoked by uid 99); 12 Feb 2016 14:03:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Feb 2016 14:03:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 63BDAE03CD; Fri, 12 Feb 2016 14:03:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Message-Id: <8a2f5eb1630649a38e3588b1a660dc6e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-3270] Add Kafka example Date: Fri, 12 Feb 2016 14:03:59 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 1dee62b4b -> b6bfcf008 [FLINK-3270] Add Kafka example This closes #1533 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6bfcf00 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6bfcf00 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6bfcf00 Branch: refs/heads/master Commit: b6bfcf008e20eb4d4a3e81bedf7eaf871f121d4c Parents: 1dee62b Author: Robert Metzger Authored: Thu Jan 21 11:42:12 2016 +0100 Committer: Robert Metzger Committed: Fri Feb 12 15:02:28 2016 +0100 ---------------------------------------------------------------------- flink-examples/flink-examples-streaming/pom.xml | 51 ++++++++++++++ .../streaming/examples/kafka/ReadFromKafka.java | 63 +++++++++++++++++ .../examples/kafka/WriteIntoKafka.java | 72 ++++++++++++++++++++ .../kafka/examples/ReadFromKafka.java | 56 --------------- .../kafka/examples/WriteIntoKafka.java | 70 ------------------- 5 files changed, 186 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index ba49dc5..3ea3276 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -61,6 +61,12 @@ under the License. org.apache.flink + flink-connector-kafka-0.8_2.10 + ${project.version} + + + + org.apache.flink flink-streaming-java_2.10 ${project.version} test @@ -522,6 +528,51 @@ under the License. + + + + org.apache.maven.plugins + maven-shade-plugin + + + fat-jar-kafka-example + package + + shade + + + false + false + false + + + org.apache.flink.streaming.examples.kafka.ReadFromKafka + + + Kafka + + + + * + + org/apache/flink/streaming/examples/kafka/** + org/apache/flink/streaming/** + org/apache/kafka/** + org/apache/curator/** + org/apache/zookeeper/** + org/apache/jute/** + org/I0Itec/** + jline/** + com/yammer/** + kafka/** + + + + + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java new file mode 100644 index 0000000..2179eca --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/ReadFromKafka.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.kafka; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + + +/** + * Read Strings from Kafka and print them to standard out. + * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! + * + * Please pass the following arguments to run the example: + * --topic test --bootstrap.servers localhost:9092 --group.id myconsumer + * + */ +public class ReadFromKafka { + + public static void main(String[] args) throws Exception { + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + if(parameterTool.getNumberOfParameters() < 3) { + System.out.println("Missing parameters!\nUsage: Kafka --topic --bootstrap.servers --group.id "); + System.exit(1); + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.setNumberOfExecutionRetries(3); // retry if job fails + env.enableCheckpointing(5000); // create a checkpoint every 5 secodns + env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface + + DataStream messageStream = env + .addSource(new FlinkKafkaConsumer08<>( + parameterTool.getRequired("topic"), + new SimpleStringSchema(), + parameterTool.getProperties())); + + // write kafka stream to standard out. + messageStream.print(); + + env.execute("Read from Kafka example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java new file mode 100644 index 0000000..0a33265 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/WriteIntoKafka.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.kafka; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + + +/** + * Generate a String every 500 ms and write it into a Kafka topic + * + * Please pass the following arguments to run the example: + * --topic test --bootstrap.servers localhost:9092 + * + */ +public class WriteIntoKafka { + + public static void main(String[] args) throws Exception { + ParameterTool parameterTool = ParameterTool.fromArgs(args); + if(parameterTool.getNumberOfParameters() < 2) { + System.out.println("Missing parameters!\nUsage: Kafka --topic --bootstrap.servers "); + System.exit(1); + } + + StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.setNumberOfExecutionRetries(3); + + // very simple data generator + DataStream messageStream = env.addSource(new SourceFunction() { + public boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + long i = 0; + while(this.running) { + ctx.collect("Element - " + i++); + Thread.sleep(500); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + // write data into Kafka + messageStream.addSink(new FlinkKafkaProducer08<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); + + env.execute("Write into Kafka example"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java deleted file mode 100644 index 643da66..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/ReadFromKafka.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.examples; - -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - - -/** - * Read Strings from Kafka and print them to standard out. - * Note: On a cluster, DataStream.print() will print to the TaskManager's .out file! - * - * Please pass the following arguments to run the example: - * --topic test --bootstrap.servers localhost:9092 --group.id myconsumer - * - */ -public class ReadFromKafka { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - env.setNumberOfExecutionRetries(4); - env.enableCheckpointing(5000); - env.setParallelism(2); - - ParameterTool parameterTool = ParameterTool.fromArgs(args); - - DataStream messageStream = env - .addSource(new FlinkKafkaConsumer09<>( - parameterTool.getRequired("topic"), - new SimpleStringSchema(), - parameterTool.getProperties())); - - messageStream.print(); - - env.execute("Read from Kafka example"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java deleted file mode 100644 index fbe53fa..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/examples/WriteIntoKafka.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.examples; - -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - - -/** - * Generate a String every 500 ms and write it into a Kafka topic - * - * Please pass the following arguments to run the example: - * --topic test --bootstrap.servers localhost:9092 - * - */ -public class WriteIntoKafka { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - env.setNumberOfExecutionRetries(4); - env.setParallelism(2); - - ParameterTool parameterTool = ParameterTool.fromArgs(args); - - // very simple data generator - DataStream messageStream = env.addSource(new SourceFunction() { - public boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long i = 0; - while(this.running) { - ctx.collect("Element - " + i++); - Thread.sleep(500); - } - } - - @Override - public void cancel() { - running = false; - } - }); - - // write data into Kafka - messageStream.addSink(new FlinkKafkaProducer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties())); - - env.execute("Write into Kafka example"); - } -}