spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (SPARK-20238) Is the JavaDirectKafkaWordCount example correct for Spark version 2.1?
Date Thu, 06 Apr 2017 08:33:41 GMT

     [ https://issues.apache.org/jira/browse/SPARK-20238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen resolved SPARK-20238.
-------------------------------
    Resolution: Invalid

The example is OK. The error indicates you ran it with an argument that wasn't accepted, like
an unknown path.

Don't reopen issues. As I say, pursue the question on the mailing list first.

>  Is the JavaDirectKafkaWordCount example correct for Spark version 2.1?
> -----------------------------------------------------------------------
>
>                 Key: SPARK-20238
>                 URL: https://issues.apache.org/jira/browse/SPARK-20238
>             Project: Spark
>          Issue Type: Question
>          Components: Examples, ML
>    Affects Versions: 2.1.0
>            Reporter: rayu yuan
>
> My question is https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
correct?
> I'm pretty new to Spark.  I wanted to find an example of Spark Streaming using Java,
streaming from Kafka. The JavaDirectKafkaWordCount at https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
looked to be perfect.
> I copied code as below:
> {code}
> SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount")
> 				.setMaster("spark://slc:7077");
> 		JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
> 		Map<String, Object> kafkaParams = new HashMap<>();
> 		kafkaParams.put("bootstrap.servers", "10.0.1.2:9092");
> 		kafkaParams.put("key.deserializer", StringDeserializer.class);
> 		kafkaParams.put("value.deserializer", StringDeserializer.class);
> 		kafkaParams.put("group.id", "group1");
> 		kafkaParams.put("auto.offset.reset", "earliest");
> 		kafkaParams.put("enable.auto.commit", false);
> 		Collection<String> topics = Collections.singletonList("test");
> 		final Logger log = LogManager.getLogger(JavaDirectKafkaWordCount.class);
> 		final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
> 				LocationStrategies.PreferConsistent(),
> 				ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
> 		stream.print();
> 		
> {code}
> Appeared to throw an error around logging:
> {code}
> 17/04/05 22:43:10 INFO SparkContext: Starting job: print at JavaDirectKafkaWordCount.java:47
> 17/04/05 22:43:10 INFO DAGScheduler: Got job 0 (print at JavaDirectKafkaWordCount.java:47)
with 1 output partitions
> 17/04/05 22:43:10 INFO DAGScheduler: Final stage: ResultStage 0 (print at JavaDirectKafkaWordCount.java:47)
> 17/04/05 22:43:10 INFO DAGScheduler: Parents of final stage: List()
> 17/04/05 22:43:10 INFO DAGScheduler: Missing parents: List()
> 17/04/05 22:43:10 INFO DAGScheduler: Submitting ResultStage 0 (KafkaRDD[0] at createDirectStream
at JavaDirectKafkaWordCount.java:44), which has no missing parents
> 17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated
size 2.3 KB, free 366.3 MB)
> 17/04/05 22:43:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory
(estimated size 1529.0 B, free 366.3 MB)
> 17/04/05 22:43:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.245.226.155:15258
(size: 1529.0 B, free: 366.3 MB)
> 17/04/05 22:43:10 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
> 17/04/05 22:43:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (KafkaRDD[0]
at createDirectStream at JavaDirectKafkaWordCount.java:44)
> 17/04/05 22:43:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
> 17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor
NettyRpcEndpointRef(null) (10.245.226.155:53448) with ID 0
> 17/04/05 22:43:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.245.226.155,
executor 0, partition 0, PROCESS_LOCAL, 7295 bytes)
> 17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.245.226.155:14669
with 366.3 MB RAM, BlockManagerId(0, 10.245.226.155, 14669, None)
> 17/04/05 22:43:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor
NettyRpcEndpointRef(null) (10.245.226.155:53447) with ID 1
> 17/04/05 22:43:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.245.226.155:33754
with 366.3 MB RAM, BlockManagerId(1, 10.245.226.155, 33754, None)
> 17/04/05 22:43:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.245.226.155,
executor 0): java.lang.NullPointerException
>         at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:409)
>         at org.apache.spark.util.Utils$.fetchFile(Utils.scala:434)
>         at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:508)
>         at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:500)
>         at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>         at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>         at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:500)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:257)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> So is the example in https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
or is there something I could have done differently to get that example working?
> and how I can debug spark jobs or logging of the jobs?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message