spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Suhas Shekar <suhsheka...@gmail.com>
Subject Re: Setting up Simple Kafka Consumer via Spark Java app
Date Mon, 29 Dec 2014 08:23:34 GMT
I'm very close! So I added that and then I added this:
http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta

and it seems as though the stream is working as it says Stream 0 received 1
or 2 blocks as I enter in messages on my kafka producer. However, the
Receiver seems to keep trying every 2 seconds (as I've included 2000 in my
duration in my java app). How can I stop the Receiver from consuming
messages after 10 seconds and output the word count to the console?

Thanks a lot for all the help! I'm excited to see this word count :)

Suhas Shekar

University of California, Los Angeles
B.A. Economics, Specialization in Computing 2014

On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <akhil@sigmoidanalytics.com>
wrote:

> Add this jar in the dependency
> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0
>
> Thanks
> Best Regards
>
> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <suhshekar52@gmail.com>
> wrote:
>
>> Hello Akhil,
>>
>> I chanced my Kafka dependency to 2.10 (which is the version of kafka that
>> was on 10.0.1.232). I am getting a slightly different error, but at the
>> same place as the previous error (pasted below).
>>
>> FYI, when I make these changes to the pom file, I do "mvn clean package"
>> then cp the new jar files from the repository to my lib of jar files which
>> is a argument in my spark-submit script which is in my original post.
>>
>> Thanks again for the time and help...much appreciated.
>>
>>
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver
>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream with
>> group: c1
>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper:
>> 10.0.1.232:2181
>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread
>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties
>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is
>> overridden to c1
>> 14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect
>> is overridden to 10.0.1.232:2181
>> 14/12/29 07:56:00 INFO VerifiableProperties: Property
>> zookeeper.connection.timeout.ms is overridden to 10000
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with
>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>> com/yammer/metrics/Metrics
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop
>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0
>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for stream
>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>> com/yammer/metrics/Metrics
>>         at
>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51)
>>         at
>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83)
>>         at
>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107)
>>         at
>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
>>         at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89)
>>         at
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97)
>>         at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>         at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>         at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>         at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>         at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics
>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>         ... 18 more
>>
>>
>> Suhas Shekar
>>
>> University of California, Los Angeles
>> B.A. Economics, Specialization in Computing 2014
>>
>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <suhshekar52@gmail.com>
>> wrote:
>>
>>> I made both versions 1.1.1 and I got the same error. I then tried making
>>> both 1.1.0 as that is the version of my Spark Core, but I got the same
>>> error.
>>>
>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark
>>> streaming kafka dependency is 2.10.x...I will try changing that next, but
>>> don't think that will solve the error as I dont think the application had
>>> got to level yet.
>>>
>>> Please let me know of any possible next steps.
>>>
>>> Thank you again for the time and the help!
>>>
>>>
>>>
>>> Suhas Shekar
>>>
>>> University of California, Los Angeles
>>> B.A. Economics, Specialization in Computing 2014
>>>
>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <akhil@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Just looked at the pom file that you are using, why are you having
>>>> different versions in it?
>>>>
>>>> <dependency>
>>>> <groupId>org.apache.spark</groupId>
>>>> <artifactId>spark-streaming-kafka_2.10</artifactId>
>>>> <version>*1.1.1*</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.spark</groupId>
>>>> <artifactId>spark-streaming_2.10</artifactId>
>>>> <version>*1.0.2*</version>
>>>> </dependency>
>>>>
>>>> ​can you make both the versions the same?​
>>>>
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <suhshekar52@gmail.com>
>>>> wrote:
>>>>
>>>>> 1) Could you please clarify on what you mean by checking the Scala
>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same
as
>>>>> when I start spark-shell).
>>>>>
>>>>> 2) The spark master URL is definitely correct as I have run other apps
>>>>> with the same script that use Spark (like a word count with a local file)
>>>>>
>>>>> Thank you for the help!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Suhas Shekar
>>>>>
>>>>> University of California, Los Angeles
>>>>> B.A. Economics, Specialization in Computing 2014
>>>>>
>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das <
>>>>> akhil@sigmoidanalytics.com> wrote:
>>>>>
>>>>>> Make sure you verify the following:
>>>>>>
>>>>>> - Scala version : I think the correct version would be 2.10.x
>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the
>>>>>> webui's top left corner (running on port 8080)
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <suhshekar52@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Everyone,
>>>>>>>
>>>>>>> Thank you for the time and the help :).
>>>>>>>
>>>>>>> My goal here is to get this program working:
>>>>>>>
>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
>>>>>>>
>>>>>>> The only lines I do not have from the example are lines 62-67.
>>>>>>> pom.xml
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>> >
>>>>>>>
>>>>>>> Background: Have ec2 instances running. The standalone spark
is
>>>>>>> running on
>>>>>>> top of Cloudera Manager 5.2.
>>>>>>>
>>>>>>> Pom file is attached and the same for both clusters.
>>>>>>> pom.xml
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml
>>>>>>> >
>>>>>>>
>>>>>>> Here are a few different approaches I have taken and the issues
I
>>>>>>> run into:
>>>>>>>
>>>>>>> *Standalone Mode*
>>>>>>>
>>>>>>> 1) Use spark-submit script to run:
>>>>>>>
>>>>>>>
>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit
>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077  --jars $(echo
>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',')
>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar
>>>>>>>
>>>>>>> Interesting...I was getting an error like this: Initial job has
not
>>>>>>> accepted
>>>>>>> any resources; check your cluster UI
>>>>>>>
>>>>>>> Now, when I run, it prints out the 3 Hello world statements in
my
>>>>>>> code:
>>>>>>> KafkaJavaConsumer.txt
>>>>>>> <
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt
>>>>>>> >
>>>>>>>
>>>>>>> and then it seems to try to start the Kafka Stream, but fails:
>>>>>>>
>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer
Stream
>>>>>>> with
>>>>>>> group: c1
>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for
>>>>>>> stream 0
>>>>>>> from akka://sparkDriver
>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper:
>>>>>>> 10.0.1.232:2181
>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing
thread
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver
with
>>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError:
>>>>>>> scala/reflect/ClassManifest
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver
onStop
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering
>>>>>>> receiver 0
>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver
for
>>>>>>> stream
>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>>>>>> scala/reflect/ClassManifest
>>>>>>>         at
>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29)
>>>>>>>         at
>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala)
>>>>>>>         at kafka.utils.Logging$class.$init$(Logging.scala:29)
>>>>>>>         at
>>>>>>>
>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26)
>>>>>>>         at
>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>>>>>         at java.lang.Thread.run(Thread.java:722)
>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>> scala.reflect.ClassManifest
>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>>>>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>>>>         at java.security.AccessController.doPrivileged(Native
Method)
>>>>>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
>>>>>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
>>>>>>>         ... 18 more
>>>>>>>
>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver
0
>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator
>>>>>>>
>>>>>>> I ran into a couple other Class not found errors, and was able
to
>>>>>>> solve them
>>>>>>> by adding dependencies on the pom file, but have not found such
a
>>>>>>> solution
>>>>>>> to this error.
>>>>>>>
>>>>>>> On the Kafka side of things, I am simply typing in messages as
soon
>>>>>>> as I
>>>>>>> start the Java app on another console. Is this okay?
>>>>>>>
>>>>>>> I have not set up an advertised host on the kafka side as I was
able
>>>>>>> to
>>>>>>> still receive messages from other consoles by setting up a consumer
>>>>>>> to
>>>>>>> listen to the private ip:port. Is this okay?
>>>>>>>
>>>>>>> Lastly, is there command, like --from-beginning for a consumer
in
>>>>>>> the java
>>>>>>> application to get messages from the beginning?
>>>>>>>
>>>>>>> Thanks a lot for the help and happy holidays!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message