spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ayman Farahat <ayman.fara...@yahoo.com.INVALID>
Subject Re: Failed stages and dropped executors when running implicit matrix factorization/ALS : Same error after the re-partition
Date Sun, 28 Jun 2015 03:52:23 GMT
That's correct this is Yarn
And spark 1.4
Also using the Anaconda tar for Numpy and other Libs


Sent from my iPhone

> On Jun 27, 2015, at 8:50 PM, Sabarish Sasidharan <sabarish.sasidharan@manthan.com>
wrote:
> 
> Are you running on top of YARN? Plus pls provide your infrastructure details.
> 
> Regards
> Sab
> 
>> On 28-Jun-2015 8:47 am, "Ayman Farahat" <ayman.farahat@yahoo.com.invalid> wrote:
>> Hello; 
>> I tried to adjust the number of blocks by repartitioning the input. 
>> Here is How I do it;  (I am partitioning by users )
>> 
>> tot = newrdd.map(lambda l: (l[1],Rating(int(l[1]),int(l[2]),l[4]))).partitionBy(50).cache()
>> ratings = tot.values()
>> numIterations =8
>> rank = 80
>> model = ALS.trainImplicit(ratings, rank, numIterations)
>> 
>> 
>> I have 20 executors
>> with 5GM memory per executor. 
>> When i use 80 factors I keep getting the following problem :
>> 
>> Traceback (most recent call last):
>>   File "/homes/afarahat/myspark/mm/df4test.py", line 85, in <module>
>>     model = ALS.trainImplicit(ratings, rank, numIterations)
>>   File "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/recommendation.py",
line 201, in trainImplicit
>>   File "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 128, in callMLlibFunc
>>   File "/homes/afarahat/aofspark/share/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 121, in callJavaFunc
>>   File "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
>>   File "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o113.trainImplicitALSModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage
36.1 failed 4 times, most recent failure: Lost task 7.3 in stage 36.1 (TID 1841, gsbl52746.blue.ygrid.yahoo.com):
java.io.FileNotFoundException: /grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_1027774/blockmgr-0e518470-57d8-472f-8fba-3b593e4dda42/27/rdd_56_24
(No such file or directory)
>>         at java.io.RandomAccessFile.open(Native Method)
>>         at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
>>         at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
>>         at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>>         at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
>>         at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
>>         at org.apache.spark.storage.BlockManager.get(BlockManager.scala:617)
>>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:722)
>> 
>> Driver stacktrace:
>>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>         at scala.Option.foreach(Option.scala:236)
>>         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 
>> Jun 28, 2015 2:10:37 AM INFO: parquet.hadoop.ParquetFileReader: Initiating action
with parallelism: 5
>> ~                                                                               
                                                              
>>> On Jun 26, 2015, at 12:33 PM, Xiangrui Meng <mengxr@gmail.com> wrote:
>>> 
>>> So you have 100 partitions (blocks). This might be too many for your dataset.
Try setting a smaller number of blocks, e.g., 32 or 64. When ALS starts iterations, you can
see the shuffle read/write size from the "stages" tab of Spark WebUI. Vary number of blocks
and check the numbers there. Kyro serializer doesn't help much here. You can try disabling
it (though I don't think it caused the failure). -Xiangrui
>>> 
>>>> On Fri, Jun 26, 2015 at 11:00 AM, Ayman Farahat <ayman.farahat@yahoo.com>
wrote:
>>>> Hello ; 
>>>> I checked on my partitions/storage and here is what I have 
>>>> 
>>>> I have 80 executors
>>>> 5 G per executore. 
>>>> 
>>>> Do i need to set additional params
>>>> say cores
>>>> 
>>>> spark.serializer                 org.apache.spark.serializer.KryoSerializer
>>>> # spark.driver.memory              5g
>>>> # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one
two three"
>>>> spark.shuffle.memoryFraction  0.3
>>>> spark.storage.memoryFraction  0.65
>>>> 
>>>> 
>>>> 
>>>> RDD Name	Storage Level	Cached Partitions	Fraction Cached	Size in Memory	Size
in Tachyon	Size on Disk
>>>> ratingBlocks	Memory Deserialized 1x Replicated	257	129%	4.1 GB	0.0 B	0.0
B
>>>> itemOutBlocks	Memory Deserialized 1x Replicated	100	100%	7.3 MB	0.0 B	0.0
B
>>>> 38	Memory Serialized 1x Replicated	193	97%	5.6 GB	0.0 B	0.0 B
>>>> userInBlocks	Memory Deserialized 1x Replicated	100	100%	2.8 GB	0.0 B	0.0
B
>>>> itemFactors-1	Memory Deserialized 1x Replicated	69	69%	8.4 MB	0.0 B	0.0 B
>>>> itemInBlocks	Memory Deserialized 1x Replicated	69	69%	1455.3 MB	0.0 B	0.0
B
>>>> userFactors-1	Memory Deserialized 1x Replicated	100	100%	35.0 GB	0.0 B	0.0
B
>>>> userOutBlocks	Memory Deserialized 1x Replicated	100	100%	1062.7 MB	0.0 B
0.0 B
>>>> 
>>>>> On Jun 26, 2015, at 8:26 AM, Xiangrui Meng <mengxr@gmail.com> wrote:
>>>>> 
>>>>>  number of CPU cores or less.

Mime
View raw message