Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DBAFF200C17 for ; Fri, 10 Feb 2017 11:08:51 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DA2DF160B5C; Fri, 10 Feb 2017 10:08:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B1C0B160B5B for ; Fri, 10 Feb 2017 11:08:50 +0100 (CET) Received: (qmail 7716 invoked by uid 500); 10 Feb 2017 10:08:48 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 7706 invoked by uid 99); 10 Feb 2017 10:08:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Feb 2017 10:08:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 08898185F63 for ; Fri, 10 Feb 2017 10:08:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.001 X-Spam-Level: X-Spam-Status: No, score=0.001 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id L4RBWLN0MXAj for ; Fri, 10 Feb 2017 10:08:45 +0000 (UTC) Received: from mailout1.hs-ulm.de (mailout1.HS-Ulm.DE [141.59.88.26]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 77D325F647 for ; Fri, 10 Feb 2017 10:08:45 +0000 (UTC) Received: by mailout1.hs-ulm.de (Postfix, from userid 65534) id 112E5E00E1; Fri, 10 Feb 2017 11:08:33 +0100 (CET) Received: from localhost (localhost [127.0.0.1]) by mailout1.hs-ulm.de (Postfix) with ESMTP id 473C0E00E1 for ; Fri, 10 Feb 2017 11:08:21 +0100 (CET) X-Virus-Scanned: Debian amavisd-new at mailout1.hs-ulm.de Received: from mailout1.hs-ulm.de ([127.0.0.1]) by localhost (rz-mailout1.hs-ulm.de [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 1WFrw4PhK3B3 for ; Fri, 10 Feb 2017 11:08:20 +0100 (CET) Received: from smtp.hs-ulm.de (smtp.hs-ulm.de [141.59.85.60]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-SHA384 (256/256 bits)) (No client certificate requested) by mailout1.hs-ulm.de (Postfix) with ESMTPS id CAFB9E00E0 for ; Fri, 10 Feb 2017 11:08:20 +0100 (CET) Received: from RZ-MSX2.hs-ulm.de (141.59.85.61) by RZ-MSX1.hs-ulm.de (141.59.85.60) with Microsoft SMTP Server (TLS) id 15.0.1210.3; Fri, 10 Feb 2017 11:08:20 +0100 Received: from RZ-MSX2.hs-ulm.de ([fe80::3435:d863:4dfd:580f]) by RZ-MSX2.hs-ulm.de ([fe80::3435:d863:4dfd:580f%12]) with mapi id 15.00.1210.000; Fri, 10 Feb 2017 11:08:20 +0100 From: "Gutwein, Sebastian" To: "user@spark.apache.org" Subject: Write JavaDStream to Kafka (how?) Thread-Topic: Write JavaDStream to Kafka (how?) Thread-Index: AQHSg4UBl1gy5MKpDUCWlSLcgv67Kg== Date: Fri, 10 Feb 2017 10:08:20 +0000 Message-ID: <1486721300258.51419@mail.hs-ulm.de> Accept-Language: de-DE, en-US Content-Language: de-DE X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ms-exchange-transport-fromentityheader: Hosted x-originating-ip: [141.113.3.15] Content-Type: multipart/alternative; boundary="_000_148672130025851419mailhsulmde_" MIME-Version: 1.0 archived-at: Fri, 10 Feb 2017 10:08:52 -0000 --_000_148672130025851419mailhsulmde_ Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Hi, I'am new to Spark-Streaming and want to run some end-to-end-tests with Spar= k and Kafka. My program is running but at the kafka topic nothing arrives. Can someone p= lease help me? Where is my mistake, has someone a runnig example of writing a DStream to K= afka 0.10.1.0? The program looks like follows: import kafka.Kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.*; import org.apache.spark.rdd.RDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Int; import scala.Tuple2; import java.util.*; import java.util.regex.Pattern; /** * Consumes messages from one or more topics in Kafka and does wordcount. * * Usage: JavaKafkaWordCount * is a list of one or more zookeeper servers that make quorum * is the name of kafka consumer group * is a list of one or more kafka topics to consume from * is the number of threads the kafka consumer should use * * To run this example: * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCo= unt zoo01,zoo02, \ * zoo03 my-consumer-group topic1,topic2 1` */ public final class JavaKafkaWordCountTest { private static final Pattern SPACE =3D Pattern.compile(" "); private JavaKafkaWordCountTest() { } public static void main(String[] args) throws Exception { if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount "); System.exit(1); } SparkConf sparkConf =3D new SparkConf().setAppName("GutweinKafkaWordCou= nt"); // Create the context with 2 seconds batch size JavaStreamingContext jssc =3D new JavaStreamingContext(sparkConf, new D= uration(2000)); int numThreads =3D Integer.parseInt(args[3]); Map topicMap =3D new HashMap<>(); String[] topics =3D args[2].split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } final JavaPairReceiverInputDStream messages =3D KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream lines =3D messages.map(new Function, String>() { @Override public String call(Tuple2 tuple2) { return tuple2._2(); } }); JavaDStream words =3D lines.flatMap(new FlatMapFunction() { @Override public Iterator call(String x) { return Arrays.asList(SPACE.split(x)).iterator(); } }); JavaPairDStream wordCounts =3D words.mapToPair( new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2<>(s, 1); } }).reduceByKey(new Function2() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); final KafkaWriter writer =3D new KafkaWriter("localhost:9081"); wordCounts.foreachRDD(new VoidFunction>() = { @Override public void call(JavaPairRDD stringIntegerJavaPair= RDD) throws Exception { writer.writeToTopic("output", stringIntegerJavaPairRDD.toString= ()); } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); } public static class KafkaWriter { Properties props =3D new Properties(); KafkaProducer producer; // Constructor KafkaWriter(String bootstrap_server){ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kaf= ka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.k= afka.common.serialization.StringSerializer"); producer =3D new KafkaProducer(props); } private void writeToTopic(String topicName, String message){ ProducerRecord record =3D new ProducerRecord<>(topicN= ame, message); producer.send(record); } } } --_000_148672130025851419mailhsulmde_ Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable

Hi,


I'am new to Spark-Streaming and want to= run some end-to-end-tests with Spark and Kafka.

My program is running but at the kafka = topic nothing arrives. Can someone please help me?

Where is my mistake, has someone a runn= ig example of writing a DStream to Kafka 0.10.1.0?


The program looks like follow= s:

import kafka.Kafka;
import org.apache.= kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Produc= erConfig;
import = org.apache.kafka.clients.producer.ProducerRecord;
import
org.apache.spark.Spar= kConf;
import org= .apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;<= br style=3D"">imp= ort org= .apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import
org.apache.spark.str= eaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import <= span style=3D"background-color: rgb(228, 228, 255);">org.apache.spar= k.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.Java= PairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingCon= text;
import org<= /span>.apache.spark.streaming.kafka.KafkaUtils;
import scala.Int;
import= scala.Tuple2;

import java.util.*;
= import jav= a.util.regex.Pattern;

/**
* Consumes messages from one or m= ore topics in Kafka and does wordcount.
*
* Usage: JavaKafkaWordCount = <zkQuorum> <= group> <topics> <numThreads>
* <zkQuorum> is a li= st of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
*
<topics>
is a list of one or more kafka topics to con= sume from
* <numThreads> is the number o= f threads the kafka consumer should use
*
* To run this example:
* `$ bin/ru= n-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo0= 2, \
* zoo03 my-consumer-group topic1,topic2 1`
*/

public final class Ja= vaKafkaWordCountTest {
private static final Pattern SPACE
=3D Pattern.compile(" ");

private JavaKafkaWordCountTest() {
}

public static void main(String[] args) throws Exception {
if = (args.length <= /span>< 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <= group> <topics> <numThreads>");
= System.exit(1);
}

SparkConf sparkConf =3D new SparkConf().setAppName("GutweinKafkaWordCount");
// Create the context with 2 seconds batch size
JavaStre= amingContext jssc =3D new JavaStreamingContext(sparkConf, new Duration(2000));

int numThreads =3D In= teger.parseInt(args[3]);
Map<String, In= teger> topicMap =3D new HashMap<>();
String[] topics =3D a= rgs[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThr= eads);
}

final JavaPairReceiverInp= utDStream<String, String> messages =3D
Kafk= aUtils.createStream(jssc, args[<= span style=3D"color: rgb(0, 0, 255);">0
], args[1], topicMap);

Ja= vaDStream<String> lines =3D messages.map(new Function<Tuple2<String, Stri= ng>, String>() {
@Override
= public String call(Tuple2<String, String> tuple2) {
= return = tuple2._2();
}
});

JavaDStream<String> words =3D lines.flatMap(new FlatMapFunction&= lt;String, String>() {
@Override
= public= Iterator<String> call(String x) {
return
Arrays.<= span style=3D"font-style: italic;">asList
(SPACE.split(x= )).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts =3D word= s.mapToPair(
new PairFunction<String, String, Integer>() {<= br style=3D""> @Override public Tuple2<Stri= ng, Integer> call(String s) {
return new Tuple2<>(s,= 1);
}<= br style=3D""> }).reduceByKey(new Function2<Integer, Integer, Integer>() {=
@Override public Integer call(= Integer i1, Integer i2) {
return i1 + i2;
= }
});

final KafkaWriter w= riter =3D new KafkaWriter(&= quot;localhost:9081");

wordCou= nts.foreachRDD(ne= w VoidFunction<JavaPairRDD<String, Integer>>() {
@Override
public void call(JavaPairRDD= <String, Integer> stringIntegerJavaPairRDD) throws Exception {
= writer.writeToTo= pic("output&= quot;, stringIntegerJavaPairRDD.toString());
}=
});

wordCounts.print();=
jssc.start();
jssc.awaitTermination();=
}

public static class KafkaWriter { Properties props =3D new Properties();
KafkaProducer<S= tring, String> producer;

// Constructor
KafkaWrit= er(String bootstrap_server){
props.put(ProducerConfig.BO= OTSTRAP_SERVERS_CONFIG, bootstrap_server);
props
.put(Pr= oducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.seriali= zation.StringSerializer");
props.put(ProducerConfi= g.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.St= ringSerializer");
producer =3D new KafkaProducer<String, = String>(pro= ps);
}


private void= writeToTopic(String topicName, String message){
= ProducerRecord<String, String> record =3D new ProducerRecord<>(topicName= , message);
producer.send(record);

}

}

}


--_000_148672130025851419mailhsulmde_--