crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nithin Asokan <anithi...@gmail.com>
Subject Re: Spark Scheduler
Date Wed, 30 Sep 2015 20:50:15 GMT
Thanks Micah!

Logged https://issues.apache.org/jira/browse/CRUNCH-566


On Wed, Sep 30, 2015 at 3:35 PM Micah Whitacre <mkwhitacre@gmail.com> wrote:

> Do you mind logging a bug for the multiple parallel pipelines and done
> call?  The done() should wait for all stages to complete before closing the
> context and avoid that error.
>
> On Wed, Sep 30, 2015 at 3:33 PM, Nithin Asokan <anithin19@gmail.com>
> wrote:
>
>> Hey Micah,
>> I tried the approaches you mentioned(gist updated
>> <https://gist.github.com/nasokan/7a0820411656f618f182>). Both MRPipeline
>> and SparkPipeline appears to submit and run parallel jobs. SparkPipeline
>> supported it only when using *Pipeline#runAsync(), *also SparkPipeline
>> fails when *Pipeline#done()* is used, removing it makes my Pipeline to
>> work.
>>
>> Here is the error I see when I used
>>
>> pipeline.write(path1);
>> pipeline.runAsync();
>> pipeline.write(path2);
>> pipeline.runAsync();
>> pipeline.done();
>>
>> Exception in thread "Thread-37" java.lang.NullPointerException
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102)
>>     at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>>     at
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
>>     at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
>>     at
>> org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
>>     at
>> org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)
>>     at
>> org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51)
>>     at
>> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55)
>>     at
>> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44)
>>     at
>> org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295)
>>     at
>> org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80)
>>     at
>> org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139)
>>     at java.lang.Thread.run(Thread.java:745)
>> Exception in thread "Thread-38" java.lang.NullPointerException
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102)
>>     at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>     at
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>>     at
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
>>     at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
>>     at
>> org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
>>     at
>> org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)
>>     at
>> org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51)
>>     at
>> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55)
>>     at
>> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44)
>>     at
>> org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295)
>>     at
>> org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80)
>>     at
>> org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> However, I didn't see the same error when I have this code
>>
>> pipeline.write(path1);
>> pipeline.runAsync();
>> pipeline.write(path2);
>> pipeline.done();
>>
>> Instead, when one of the job completes, it closes the SparkContext which
>> will terminate any jobs that is currently active.
>>
>> [2015-09-30 15:18:55,835] [ERROR] [Thread-37]
>> [org.apache.crunch.impl.spark.SparkRuntime] - Spark Exception
>> org.apache.spark.SparkException: Job cancelled because SparkContext was
>> shut down
>>     at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
>>     at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
>>     at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>>     at
>> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
>>     at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
>>     at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
>>     at
>> org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
>>     at org.apache.spark.SparkContext.stop(SparkContext.scala:1386)
>>     at
>> org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:652)
>>     at
>> org.apache.crunch.impl.spark.SparkPipeline.done(SparkPipeline.java:178)
>>     at com.test.ParallelAction.run(ParallelAction.java:47)
>>     at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>>     at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
>>     at com.test.ParallelAction.main(ParallelAction.java:53)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>     at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>>     at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>>     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> So it appears that we can still launch parallel jobs using *runAsync() *
>> (no* done()* should be used), but I'm not sure if it's feasible for
>> Crunch to determine independent SourceTarget's and launch them in parallel.
>> It may make it a good feature :)
>>
>> Thanks
>> Nithin
>>
>>
>>
>> On Wed, Sep 30, 2015 at 12:54 PM Micah Whitacre <mkwhitacre@gmail.com>
>> wrote:
>>
>>> Try switching your test around a bit because I believe there are
>>> instances even with MRPipeline where Crunch will kick off multiple jobs in
>>> parallel.
>>>
>>> Something like the following:
>>>
>>> Read Input1 -> Filter -> Write Output1
>>> Read Input2 -> Filter -> Write Output2
>>> pipeline.done();
>>>
>>> Try with the MRPipeline and then with Spark to see what is in parallel
>>> vs what is serial.
>>>
>>> The other option is that is less ideal is that you could change your
>>> code to be:
>>>
>>> Read Input1 -> Filter -> Write Output1
>>> pipeline.runAsync()
>>> Read Input2 -> Filter -> Write Output2
>>> pipeline.runAsync()
>>> pipeline.done();
>>>
>>> This should kick them each off independently and give you the
>>> parallelism.  It would be nice however if you didn't have to do this
>>> splitting but was done for you.
>>>
>>>
>>> On Wed, Sep 30, 2015 at 12:41 PM, Nithin Asokan <anithin19@gmail.com>
>>> wrote:
>>>
>>>> I was reading about Spark scheduler[1], and this line caught my
>>>> attention
>>>>
>>>> *Inside a given Spark application (SparkContext instance), multiple
>>>> parallel jobs can run simultaneously if they were submitted from separate
>>>> threads. By “job”, in this section, we mean a Spark action
>>>> (e.g. save, collect) and any tasks that need to run to evaluate that
>>>> action. Spark’s scheduler is fully thread-safe and supports this use case
>>>> to enable applications that serve multiple requests (e.g. queries for
>>>> multiple users).*
>>>>
>>>> If I understood the above statement, I think it is possible to have
>>>> multiple jobs running parallel on a Spark application, as long as the *actions
>>>> *are triggered by separate thread.
>>>>
>>>> I was trying to test this out on my Crunch Spark
>>>> application(yarn-client) which reads two independent HDFS sources and
>>>> perform *PCollection#getLenght() *on each source*. *The Spark WebUI
>>>> starts with Job1 as submitted; after Job1 is completed Job2 is submitted
>>>> and finished. I would like to get some thoughts on whether it is possible
>>>> in Crunch to identify independent source/targets and possibly create
>>>> separate threads that can interact with Spark scheduler? This way I think
>>>> we can have some independent jobs running in parallel.
>>>>
>>>> Here is the example that I used
>>>> https://gist.github.com/nasokan/7a0820411656f618f182
>>>>
>>>> [1]
>>>> https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
>>>>
>>>>
>>>
>

Mime
View raw message