Return-Path: X-Original-To: apmail-spark-issues-archive@minotaur.apache.org Delivered-To: apmail-spark-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 319D110EDF for ; Tue, 24 Feb 2015 06:14:12 +0000 (UTC) Received: (qmail 69879 invoked by uid 500); 24 Feb 2015 06:14:12 -0000 Delivered-To: apmail-spark-issues-archive@spark.apache.org Received: (qmail 69848 invoked by uid 500); 24 Feb 2015 06:14:12 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 69836 invoked by uid 99); 24 Feb 2015 06:14:12 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Feb 2015 06:14:12 +0000 Date: Tue, 24 Feb 2015 06:14:11 +0000 (UTC) From: "Saisai Shao (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (SPARK-5953) NoSuchMethodException with a Kafka input stream and custom decoder in Scala MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334469#comment-14334469 ] Saisai Shao commented on SPARK-5953: ------------------------------------ I think the decoder is initialized in the executor side, so your class should be found in executor side. One way is to add the jar with customized decoder class included using SparkContext#addJar. > NoSuchMethodException with a Kafka input stream and custom decoder in Scala > --------------------------------------------------------------------------- > > Key: SPARK-5953 > URL: https://issues.apache.org/jira/browse/SPARK-5953 > Project: Spark > Issue Type: Bug > Components: Input/Output, Spark Core > Affects Versions: 1.2.0, 1.2.1 > Environment: Xubuntu 14.04, Kafka 0.8.2, Scala 2.10.4, Scala 2.11.5 > Reporter: Aleksandar Stojadinovic > > When using a Kafka input stream, and setting a custom Kafka Decoder, Spark throws an exception upon starting: > {noformat} > ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: UserLocationEventDecoder.(kafka.utils.VerifiableProperties) > at java.lang.Class.getConstructor0(Class.java:2971) > at java.lang.Class.getConstructor(Class.java:1812) > at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) > at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) > at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) > at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) > at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) > at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/02/23 21:37:31 ERROR ReceiverSupervisorImpl: Stopped executor with error: java.lang.NoSuchMethodException: UserLocationEventDecoder.(kafka.utils.VerifiableProperties) > 15/02/23 21:37:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) > java.lang.NoSuchMethodException: UserLocationEventDecoder.(kafka.utils.VerifiableProperties) > at java.lang.Class.getConstructor0(Class.java:2971) > at java.lang.Class.getConstructor(Class.java:1812) > at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106) > at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) > at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) > at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) > at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) > at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The stream is initialized with: > {code:title=Main.scala|borderStyle=solid} > val locationsAndKeys = KafkaUtils.createStream[String, Array[Byte], kafka.serializer.StringDecoder, UserLocationEventDecoder] (ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK); > {code} > The decoder: > {code:title=UserLocationEventDecoder.scala|borderStyle=solid} > import kafka.serializer.Decoder > class UserLocationEventDecoder extends Decoder[UserLocationEvent] { > val kryo = new Kryo() > override def fromBytes(bytes: Array[Byte]): UserLocationEvent = { > val input: Input = new Input(new ByteArrayInputStream(bytes)) > val userLocationEvent: UserLocationEvent = kryo.readClassAndObject(input).asInstanceOf[UserLocationEvent] > input.close() > return userLocationEvent > } > } > {code} > build.sbt: > {code:borderStyle=solid} > scalaVersion := "2.10.4" > libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.1" > libraryDependencies += "com.spatial4j" % "spatial4j" % "0.4.1" > libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.1" > libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.2.1" > libraryDependencies += "com.twitter" % "chill_2.10" % "0.5.2" > {code} > The input stream (and my code overall) works fine if initialized with the kafka.serializer.DefaultDecoder, and content is manually deserialized. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org