spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hemanth Yamijala <yhema...@gmail.com>
Subject Setting Kafka parameters in Spark Streaming
Date Mon, 08 Sep 2014 16:48:37 GMT
Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the
parameter fetch.message.max.bytes when creating the Kafka DStream. The only
API that seems to allow this is the following:

kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T],
decoderClass: Class[D], kafkaParams: Map[String, String], topics:
Map[String, Integer], storageLevel: StorageLevel)

I tried to call this as so:

context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
StorageLevel.MEMORY_AND_DISK())

However, this is causing an exception like:

java.lang.ClassCastException: java.lang.Object cannot be cast to
kafka.serializer.Decoder

at
org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)

at
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)

at
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)

at
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)

Can anyone provide help on how to set these parameters ?

Thanks

Hemanth

Mime
View raw message