crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nithin Asokan <anithi...@gmail.com>
Subject Re: Spark Scheduler
Date Wed, 30 Sep 2015 20:33:28 GMT
Hey Micah,
I tried the approaches you mentioned(gist updated
<https://gist.github.com/nasokan/7a0820411656f618f182>). Both MRPipeline
and SparkPipeline appears to submit and run parallel jobs. SparkPipeline
supported it only when using *Pipeline#runAsync(), *also SparkPipeline
fails when *Pipeline#done()* is used, removing it makes my Pipeline to
work.

Here is the error I see when I used

pipeline.write(path1);
pipeline.runAsync();
pipeline.write(path2);
pipeline.runAsync();
pipeline.done();

Exception in thread "Thread-37" java.lang.NullPointerException
    at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103)
    at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
    at
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
    at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
    at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
    at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
    at
org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)
    at
org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51)
    at
org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55)
    at
org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44)
    at
org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295)
    at
org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80)
    at
org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "Thread-38" java.lang.NullPointerException
    at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103)
    at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
    at
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
    at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
    at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
    at org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878)
    at
org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516)
    at
org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51)
    at
org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55)
    at
org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44)
    at
org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295)
    at
org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80)
    at
org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139)
    at java.lang.Thread.run(Thread.java:745)

However, I didn't see the same error when I have this code

pipeline.write(path1);
pipeline.runAsync();
pipeline.write(path2);
pipeline.done();

Instead, when one of the job completes, it closes the SparkContext which
will terminate any jobs that is currently active.

[2015-09-30 15:18:55,835] [ERROR] [Thread-37]
[org.apache.crunch.impl.spark.SparkRuntime] - Spark Exception
org.apache.spark.SparkException: Job cancelled because SparkContext was
shut down
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
    at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1386)
    at
org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:652)
    at
org.apache.crunch.impl.spark.SparkPipeline.done(SparkPipeline.java:178)
    at com.test.ParallelAction.run(ParallelAction.java:47)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
    at com.test.ParallelAction.main(ParallelAction.java:53)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

So it appears that we can still launch parallel jobs using *runAsync() *(no*
done()* should be used), but I'm not sure if it's feasible for Crunch to
determine independent SourceTarget's and launch them in parallel. It may
make it a good feature :)

Thanks
Nithin



On Wed, Sep 30, 2015 at 12:54 PM Micah Whitacre <mkwhitacre@gmail.com>
wrote:

> Try switching your test around a bit because I believe there are instances
> even with MRPipeline where Crunch will kick off multiple jobs in parallel.
>
> Something like the following:
>
> Read Input1 -> Filter -> Write Output1
> Read Input2 -> Filter -> Write Output2
> pipeline.done();
>
> Try with the MRPipeline and then with Spark to see what is in parallel vs
> what is serial.
>
> The other option is that is less ideal is that you could change your code
> to be:
>
> Read Input1 -> Filter -> Write Output1
> pipeline.runAsync()
> Read Input2 -> Filter -> Write Output2
> pipeline.runAsync()
> pipeline.done();
>
> This should kick them each off independently and give you the
> parallelism.  It would be nice however if you didn't have to do this
> splitting but was done for you.
>
>
> On Wed, Sep 30, 2015 at 12:41 PM, Nithin Asokan <anithin19@gmail.com>
> wrote:
>
>> I was reading about Spark scheduler[1], and this line caught my attention
>>
>> *Inside a given Spark application (SparkContext instance), multiple
>> parallel jobs can run simultaneously if they were submitted from separate
>> threads. By “job”, in this section, we mean a Spark action
>> (e.g. save, collect) and any tasks that need to run to evaluate that
>> action. Spark’s scheduler is fully thread-safe and supports this use case
>> to enable applications that serve multiple requests (e.g. queries for
>> multiple users).*
>>
>> If I understood the above statement, I think it is possible to have
>> multiple jobs running parallel on a Spark application, as long as the *actions
>> *are triggered by separate thread.
>>
>> I was trying to test this out on my Crunch Spark application(yarn-client)
>> which reads two independent HDFS sources and perform *PCollection#getLenght()
>> *on each source*. *The Spark WebUI starts with Job1 as submitted; after
>> Job1 is completed Job2 is submitted and finished. I would like to get some
>> thoughts on whether it is possible in Crunch to identify independent
>> source/targets and possibly create separate threads that can interact with
>> Spark scheduler? This way I think we can have some independent jobs running
>> in parallel.
>>
>> Here is the example that I used
>> https://gist.github.com/nasokan/7a0820411656f618f182
>>
>> [1]
>> https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
>>
>>
>

Mime
View raw message