spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Saisai Shao (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5953) NoSuchMethodException with a Kafka input stream and custom decoder in Scala
Date Tue, 24 Feb 2015 06:14:11 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334469#comment-14334469
] 

Saisai Shao commented on SPARK-5953:
------------------------------------

I think the decoder is initialized in the executor side, so your class should be found in
executor side. One way is to add the jar with customized decoder class included using SparkContext#addJar.

> NoSuchMethodException with a Kafka input stream and custom decoder in Scala
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-5953
>                 URL: https://issues.apache.org/jira/browse/SPARK-5953
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 1.2.0, 1.2.1
>         Environment: Xubuntu 14.04, Kafka 0.8.2, Scala 2.10.4, Scala 2.11.5
>            Reporter: Aleksandar Stojadinovic
>
> When using a Kafka input stream, and setting a custom Kafka Decoder, Spark throws an
exception upon starting:
> {noformat}
> ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0
- java.lang.NoSuchMethodException: UserLocationEventDecoder.<init>(kafka.utils.VerifiableProperties)
> 	at java.lang.Class.getConstructor0(Class.java:2971)
> 	at java.lang.Class.getConstructor(Class.java:1812)
> 	at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
> 	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> 	at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 15/02/23 21:37:31 ERROR ReceiverSupervisorImpl: Stopped executor with error: java.lang.NoSuchMethodException:
UserLocationEventDecoder.<init>(kafka.utils.VerifiableProperties)
> 15/02/23 21:37:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.NoSuchMethodException: UserLocationEventDecoder.<init>(kafka.utils.VerifiableProperties)
> 	at java.lang.Class.getConstructor0(Class.java:2971)
> 	at java.lang.Class.getConstructor(Class.java:1812)
> 	at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
> 	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> 	at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
> 	at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
> 	at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The stream is initialized with:
> {code:title=Main.scala|borderStyle=solid}
>  val locationsAndKeys = KafkaUtils.createStream[String, Array[Byte], kafka.serializer.StringDecoder,
UserLocationEventDecoder] (ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK);
> {code}
> The decoder:
> {code:title=UserLocationEventDecoder.scala|borderStyle=solid}
> import kafka.serializer.Decoder
> class UserLocationEventDecoder extends Decoder[UserLocationEvent] {
>   val kryo = new Kryo()
>   override def fromBytes(bytes: Array[Byte]): UserLocationEvent = {
>     val input: Input = new Input(new ByteArrayInputStream(bytes))
>     val userLocationEvent: UserLocationEvent = kryo.readClassAndObject(input).asInstanceOf[UserLocationEvent]
>     input.close()
>     return userLocationEvent
>   }
> }
> {code}
> build.sbt:
> {code:borderStyle=solid}
> scalaVersion := "2.10.4"
> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.1"
> libraryDependencies += "com.spatial4j" % "spatial4j" % "0.4.1"
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.1"
> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.2.1"
> libraryDependencies += "com.twitter" % "chill_2.10" % "0.5.2"
> {code}
> The input stream (and my code overall) works fine if initialized with the kafka.serializer.DefaultDecoder,
and content is manually deserialized. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message