spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ratika Prasad <rpra...@couponsinc.com>
Subject RE: Spark-Kafka Connector issue
Date Mon, 28 Sep 2015 18:20:06 GMT
Thanks for your reply.

I invoked my program with the broker ip and host and it triggered as expected but I see the
below error

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local
spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:9092 TestTopic
15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform...
using builtin-java classes where applicable
15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as local[n], n > 1
in local mode if you have receivers to get data, otherwise Spark jobs will not get resources
to process the received data.
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([TestTopic,0])
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Whene I ran the below to check the offsets I get this

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic TestTopic --group test-consumer-group
--zookeeper localhost:2181
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode
for /consumers/test-consumer-group/offsets/TestTopic /0.

Also I just added this below configs to my kafaka/config/consumer.properties and restarted
kafka

auto.offset.reset=smallest
offsets.storage=zookeeper
offsets.channel.backoff.ms=1000
offsets.channel.socket.timeout.ms=10000
offsets.commit.max.retries=5
dual.commit.enabled=true

From: Cody Koeninger [mailto:cody@koeninger.org]
Sent: Monday, September 28, 2015 7:56 PM
To: Ratika Prasad <rprasad@couponsinc.com>
Cc: dev@spark.apache.org
Subject: Re: Spark-Kafka Connector issue

This is a user list question not a dev list question.

Looks like your driver is having trouble communicating to the kafka brokers.  Make sure the
broker host and port is available from the driver host (using nc or telnet); make sure that
you're providing the _broker_ host and port to createDirectStream, not the zookeeper host;
make sure the topics in question actually exist on kafka and the names match what you're providing
to createDirectStream.





On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad <rprasad@couponsinc.com<mailto:rprasad@couponsinc.com>>
wrote:
Hi All,

I am trying out the spark streaming and reading the messages from kafka topics which later
would be created into streams as below…I have the kafka setup on a vm and topics created
however when I try to run the program below from my spark vm as below I get an error even
though the kafka server and zookeeper are up and running

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local
spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:2181<http://172.28.161.32:2181>
redemption_inbound

Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received
-1 when reading from channel, socket has likely been closed.
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Program

public static void main(String[] args) {
    if (args.length < 2) {
      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
          "  <brokers> is a list of one or more Kafka brokers\n" +
          "  <topics> is a list of one or more kafka topics to consume from\n\n");
      System.exit(1);
    }

    String brokers = args[0];
    String topics = args[1];

    // Create context with 2 second batch interval
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStreamEventProcessing");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>,
String>() {
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>()
{
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    wordCounts.print();
    System.out.println("Word Counts are : " + wordCounts.toString());

    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
}

Mime
View raw message