crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Whitacre <mkwhita...@gmail.com>
Subject Re: Spark Scheduler
Date Wed, 30 Sep 2015 20:35:37 GMT
Do you mind logging a bug for the multiple parallel pipelines and done
call?  The done() should wait for all stages to complete before closing the
context and avoid that error.

On Wed, Sep 30, 2015 at 3:33 PM, Nithin Asokan <anithin19@gmail.com> wrote:

> Hey Micah,
> I tried the approaches you mentioned(gist updated
> <https://gist.github.com/nasokan/7a0820411656f618f182>). Both MRPipeline
> and SparkPipeline appears to submit and run parallel jobs. SparkPipeline
> supported it only when using *Pipeline#runAsync(), *also SparkPipeline
> fails when *Pipeline#done()* is used, removing it makes my Pipeline to
> work.
>
> Here is the error I see when I used
>
> pipeline.write(path1);
> pipeline.runAsync();
> pipeline.write(path2);
> pipeline.runAsync();
> pipeline.done();
>
> Exception in thread "Thread-37" java.lang.NullPointerException
>     at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103)
>     at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>     at
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
>     at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>     at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>     at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
>     at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
>     at
> org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
>     at
> org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)
>     at
> org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51)
>     at
> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55)
>     at
> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44)
>     at
> org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295)
>     at
> org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80)
>     at
> org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139)
>     at java.lang.Thread.run(Thread.java:745)
> Exception in thread "Thread-38" java.lang.NullPointerException
>     at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103)
>     at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>     at
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
>     at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>     at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>     at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
>     at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
>     at
> org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
>     at
> org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)
>     at
> org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51)
>     at
> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55)
>     at
> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44)
>     at
> org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295)
>     at
> org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80)
>     at
> org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139)
>     at java.lang.Thread.run(Thread.java:745)
>
> However, I didn't see the same error when I have this code
>
> pipeline.write(path1);
> pipeline.runAsync();
> pipeline.write(path2);
> pipeline.done();
>
> Instead, when one of the job completes, it closes the SparkContext which
> will terminate any jobs that is currently active.
>
> [2015-09-30 15:18:55,835] [ERROR] [Thread-37]
> [org.apache.crunch.impl.spark.SparkRuntime] - Spark Exception
> org.apache.spark.SparkException: Job cancelled because SparkContext was
> shut down
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
>     at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>     at
> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
>     at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
>     at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
>     at
> org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
>     at org.apache.spark.SparkContext.stop(SparkContext.scala:1386)
>     at
> org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:652)
>     at
> org.apache.crunch.impl.spark.SparkPipeline.done(SparkPipeline.java:178)
>     at com.test.ParallelAction.run(ParallelAction.java:47)
>     at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>     at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
>     at com.test.ParallelAction.main(ParallelAction.java:53)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>     at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> So it appears that we can still launch parallel jobs using *runAsync() *
> (no* done()* should be used), but I'm not sure if it's feasible for
> Crunch to determine independent SourceTarget's and launch them in parallel.
> It may make it a good feature :)
>
> Thanks
> Nithin
>
>
>
> On Wed, Sep 30, 2015 at 12:54 PM Micah Whitacre <mkwhitacre@gmail.com>
> wrote:
>
>> Try switching your test around a bit because I believe there are
>> instances even with MRPipeline where Crunch will kick off multiple jobs in
>> parallel.
>>
>> Something like the following:
>>
>> Read Input1 -> Filter -> Write Output1
>> Read Input2 -> Filter -> Write Output2
>> pipeline.done();
>>
>> Try with the MRPipeline and then with Spark to see what is in parallel vs
>> what is serial.
>>
>> The other option is that is less ideal is that you could change your code
>> to be:
>>
>> Read Input1 -> Filter -> Write Output1
>> pipeline.runAsync()
>> Read Input2 -> Filter -> Write Output2
>> pipeline.runAsync()
>> pipeline.done();
>>
>> This should kick them each off independently and give you the
>> parallelism.  It would be nice however if you didn't have to do this
>> splitting but was done for you.
>>
>>
>> On Wed, Sep 30, 2015 at 12:41 PM, Nithin Asokan <anithin19@gmail.com>
>> wrote:
>>
>>> I was reading about Spark scheduler[1], and this line caught my attention
>>>
>>> *Inside a given Spark application (SparkContext instance), multiple
>>> parallel jobs can run simultaneously if they were submitted from separate
>>> threads. By “job”, in this section, we mean a Spark action
>>> (e.g. save, collect) and any tasks that need to run to evaluate that
>>> action. Spark’s scheduler is fully thread-safe and supports this use case
>>> to enable applications that serve multiple requests (e.g. queries for
>>> multiple users).*
>>>
>>> If I understood the above statement, I think it is possible to have
>>> multiple jobs running parallel on a Spark application, as long as the *actions
>>> *are triggered by separate thread.
>>>
>>> I was trying to test this out on my Crunch Spark
>>> application(yarn-client) which reads two independent HDFS sources and
>>> perform *PCollection#getLenght() *on each source*. *The Spark WebUI
>>> starts with Job1 as submitted; after Job1 is completed Job2 is submitted
>>> and finished. I would like to get some thoughts on whether it is possible
>>> in Crunch to identify independent source/targets and possibly create
>>> separate threads that can interact with Spark scheduler? This way I think
>>> we can have some independent jobs running in parallel.
>>>
>>> Here is the example that I used
>>> https://gist.github.com/nasokan/7a0820411656f618f182
>>>
>>> [1]
>>> https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
>>>
>>>
>>

Mime
View raw message