spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Palash Gupta <spline_pal...@yahoo.com.INVALID>
Subject Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Date Thu, 05 Jan 2017 11:18:44 GMT
Hi Marco,
Thanks & please have my response for your kind information:

If it is in same host...It is expected. Afaik u cannot create >1 spark CTX on same host.Palash>>
As per my knowledge, within one spark application we cannot create two sparkSession or context
object. But within one host, we should be able to create two separate spark process if we
can provide separate amount of resources (CPU and Memory)
All I can suggest is to run. Ur apps outside cluster and on 2 different hosts. If that fails
u will need to put. Logs in ur failing app to determine why it is failingPalash>> Outside
of cluster would be expensive for us. I tried on two different host but no hope. I shared
detail debug trace log while I'm starting conversion here with stakeoverflow link. I don't
understand basically where it is actually failing. My knowledge base also is not strong to
dig down. 

If u can send me short snippet for the two I can try to reproduce provided it app can be reproduced
by reading from filesystem instead....
Palash>> Yes of course that would be really great for me. 

I attached below things for you:
1. Sample python function which is common for two apps2. How I'm starting my spark processing
& development spark cluster information


Highly appreciate for your time & support!!!


 Thanks & Best Regards,
Palash Gupta


      From: Marco Mistroni <mmistroni@gmail.com>
 To: Palash Gupta <spline_palash@yahoo.com> 
Cc: User <user@spark.apache.org>; ayan guha <guha.ayan@gmail.com>
 Sent: Thursday, January 5, 2017 2:27 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_
piece0 of broadcast_1 in Spark 2.0.0"
   
If it is in same host...It is expected. Afaik u cannot create >1 spark CTX on same host.All
I can suggest is to run. Ur apps outside cluster and on 2 different hosts. If that fails u
will need to put. Logs in ur failing app to determine why it is failing. If u can send me
short snippet for the two I can try to reproduce provided it app can be reproduced by reading
from filesystem instead....Hth
On 5 Jan 2017 10:35 am, "Palash Gupta" <spline_palash@yahoo.com> wrote:

Hi Macro,
Yes it was in the same host when problem was found.
Even when I tried to start with different host, the problem is still there.
Any hints or suggestion will be appreciated.
 Thanks & Best Regards,
Palash Gupta


      From: Marco Mistroni <mmistroni@gmail.com>
 To: Palash Gupta <spline_palash@yahoo.com> 
Cc: ayan guha <guha.ayan@gmail.com>; User <user@spark.apache.org>
 Sent: Thursday, January 5, 2017 1:01 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_
piece0 of broadcast_1 in Spark 2.0.0"
  
Hi If it only happens when u run 2 app at same time could it be that these 2 apps somehow
run on same host?Kr
On 5 Jan 2017 9:00 am, "Palash Gupta" <spline_palash@yahoo.com> wrote:

Hi Marco and respected member,
I have done all the possible things suggested by Forum but still I'm having same issue:

1. I will migrate my applications to production environment where I will have more resourcesPalash>>
I migrated my application in production where I have more CPU Cores, Memory & total 7
host in spark cluster. 
2. Use Spark 2.0.0 function to load CSV rather using databrics apiPalash>> Earlier I'm
using databricks csv api with Spark 2.0.0. As suggested by one of the mate, Now I'm using
spark 2.0.0 built in csv loader.
3. In production I will run multiple spark application at a time and try to reproduce this
error for both file system and HDFS loading casPalash>> yes I reproduced and it only
happen when two spark application run at a time. Please see the logs:
17/01/05 01:50:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.15.187.79):
java.io.IOException: org.apache.spa
rk.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
        at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1260)
        at org.apache.spark.broadcast. TorrentBroadcast. readBroadcastBlock( TorrentBroadcast.scala:174)
        at org.apache.spark.broadcast. TorrentBroadcast._value$ lzycompute(TorrentBroadcast.
scala:65)
        at org.apache.spark.broadcast. TorrentBroadcast._value( TorrentBroadcast.scala:65)
        at org.apache.spark.broadcast. TorrentBroadcast.getValue( TorrentBroadcast.scala:89)
        at org.apache.spark.broadcast. Broadcast.value(Broadcast. scala:70)
        at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:67)
        at org.apache.spark.scheduler. Task.run(Task.scala:85)
        at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274)
        at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145)
        at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread. java:745)
Caused by: org.apache.spark. SparkException: Failed to get broadcast_1_piece0 of broadcast_1
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$
TorrentBroadcast$$readBlocks$ 1.apply$mcVI$s
p(TorrentBroadcast.scala:146)
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$
TorrentBroadcast$$readBlocks$ 1.apply(Torren
tBroadcast.scala:125)
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$
TorrentBroadcast$$readBlocks$ 1.apply(Torren
tBroadcast.scala:125)
        at scala.collection.immutable. List.foreach(List.scala:381)
        at org.apache.spark.broadcast. TorrentBroadcast.org$apache$ spark$broadcast$
TorrentBroadcast$$readBlocks( TorrentBroadcast.scala:
125)
        at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$ readBroadcastBlock$1.apply(
TorrentBroadcast.scala:186)
        at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1253)
        ... 11 more

17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 10.15.187.78,
partition 0, ANY, 7305 bytes)
17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task
1 on executor id: 1 hostname: 10.15.187.78
.
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on executor
10.15.187.78: java.io.IOException (org
.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate
1]
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 10.15.187.78,
partition 0, ANY, 7305 bytes)
17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task
2 on executor id: 1 hostname: 10.15.187.78
.
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on executor
10.15.187.78: java.io.IOException (org
.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate
2]
17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 10.15.187.76,
partition 0, ANY, 7305 bytes)
17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task
3 on executor id: 6 hostname: 10.15.187.76
.
17/01/05 01:50:16 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on executor
10.15.187.76: java.io.IOException (org
.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate
3]
17/01/05 01:50:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting
job
17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have
all completed, from pool
17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
17/01/05 01:50:16 INFO scheduler.DAGScheduler: ResultStage 0 (load at NativeMethodAccessorImpl.java:
-2) failed in 2.110 s
17/01/05 01:50:16 INFO scheduler.DAGScheduler: Job 0 failed: load at NativeMethodAccessorImpl.java:
-2, took 2.262950 s
Traceback (most recent call last):
  File "/home/hadoop/development/ datareloadwithps.py", line 851, in <module>
    datareporcessing(expected_ datetime,expected_directory_ hdfs)
  File "/home/hadoop/development/ datareloadwithps.py", line 204, in datareporcessing
    df_codingsc_raw = sqlContext.read.format("csv"). option("header",'true').load( HDFS_BASE_URL
+ hdfs_dir + filename)
  File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/ readwriter.py", line 147, in
load
  File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/java_ gateway.py", line 933,
in __call__
  File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/utils. py", line 63, in deco
  File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/ protocol.py", line 312, in
get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o58.load.






 Thanks & Best Regards,
Palash Gupta


      From: Palash Gupta <spline_palash@yahoo.com>
 To: Marco Mistroni <mmistroni@gmail.com> 
Cc: ayan guha <guha.ayan@gmail.com>; User <user@spark.apache.org>
 Sent: Saturday, December 31, 2016 12:43 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_
piece0 of broadcast_1 in Spark 2.0.0"
  
Hi Marco,
Thanks!
Please have my response:
so you have a pyspark application running on spark 2.0Palash>> Yes
You have python scripts dropping files on HDFSPalash>> Yes (it is not part of spark
process, just independent python script)

then you have two spark jobPalash>> Yes
- 1 load expected hour data (pls explain. HOw many files on average)Palash>>
35,000 rows in each file at least with 150 columns 

Number of CSV file types: 7

Number of file for each type: 4

total number of file: 28

- 1 load delayed data(pls explain. how many files on average)Palash>> We may or may
not get delayed data in each hour. But for example disconnection between CSV generation system
and spark system has a network issue then we will get many delayed hour files. 

On average:
35,000 rows in each file at least with 150 columns 

Number of CSV file types: 7

Number of file for each type: 2

total number of file: 14
Do these scripts run continuously (they have a while loop) or you kick them off  via a job
scheduler on an hourly basisPalash>> No this script is running in linux cron schedule
(not in while loop). 

Do these scripts run on a cluster? 
Palash>> My pyspark application is running in a standalone cluster mode where I have
only two VM (One master, two workers).

So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of them, does aggregation
etc then populate mongo
Palash>> Yes


At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. Presumably these
files are not deleted). So your job now loads 5 files, does aggregation and store data in
mongodb? Or does your job at T+1 only loads deltas (the two new csv files which appeared at
T+1)?
Palash>> No it will only handle with newly arrived file for new expected hour. But in
delayed data handling there is a possibility to reprocess an specific hour data and re-calculate
KPI and update in mongodb. 

You said before that simply parsing csv files via spark in a standalone app works fine. 
Palash>> I said that when I stopped delayed data loading spark script now expected hour
data loading is smooth and running good since last three days.

Then what you can try is to do exactly the same processig you are doing but instead of loading
csv files from HDFS you can load from local directory and see if the problem persists......(this
just to exclude any issues with loading HDFS data.)Palash>> The issue is same loading
from file system. When I'm running only single script it is smooth. When I'm running both
script at a time in two separate pyspark applications, sometimes it is failing showing this
error while loading file from file system. 

Now I'm doing below things as per suggestion:
1. I will migrate my applications to production environment where I will have more resources2.
Use Spark 2.0.0 function to load CSV rather using databrics api3. In production I will run
multiple spark application at a time and try to reproduce this error for both file system
and HDFS loading case
When I'm done I will share details with you. 

If you have any suggestion more for debug point of view, you can add here for me


 Thanks & Best Regards,
Palash Gupta


      From: Marco Mistroni <mmistroni@gmail.com>
 To: "spline_palash@yahoo.com" <spline_palash@yahoo.com> 
Cc: ayan guha <guha.ayan@gmail.com>; User <user@spark.apache.org>
 Sent: Saturday, December 31, 2016 1:42 AM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_
piece0 of broadcast_1 in Spark 2.0.0"
  
Hi Palash

so you have a pyspark application running on spark 2.0
You have python scripts dropping files on HDFS
then you have two spark job
- 1 load expected hour data (pls explain. HOw many files on average)
- 1 load delayed data(pls explain. how many files on average)

Do these scripts run continuously (they have a while loop) or you kick them off  via a job
scheduler on an hourly basis
Do these scripts run on a cluster? 


So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of them, does aggregation
etc then populate mongo
At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. Presumably these
files are not deleted). So your job now loads 5 files, does aggregation and store data in
mongodb? Or does your job at T+1 only loads deltas (the two new csv files which appeared at
T+1)?

You said before that simply parsing csv files via spark in a standalone app works fine. Then
what you can try is to do exactly the same processig you are doing but instead of loading
csv files from HDFS you can load from local directory and see if the problem persists......(this
just to exclude any issues with loading HDFS data.)

hth
   Marco












On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta <spline_palash@yahoo.com> wrote:

Hi Marco & Ayan,
I have now clearer idea about what Marco means by Reduce. I will do it to dig down.
Let me answer to your queries:
hen you see the broadcast errors, does your job terminate? Palash>> Yes it terminated
the app.
Or are you assuming that something is wrong just because you see the message in the logs?
Palash>> No it terminated for the very first step of Spark processing (in my case loading
csv from hdfs)
Plus...Wrt logic....Who writes the CSV? With what frequency?Palash>> We parsed xml files
using python (not in spark scope) & make csv and put in hdfs
Does it app run all the time loading CSV from hadoop?
Palash>> Every hour two separate pyspark app are running1. Loading current expected
hour data, prepare kpi, do aggregation, load in mongodb2. Same operation will run for delayed
hour data

Are you using spark streaming?Palash>> No
Does it app run fine with an older version of spark (1.6 )Palash>> I didn't test with
Spark 1.6. My app is running now good as I stopped second app (delayed data loading) since
last two days. Even most of the case both are running well except few times...

Sent from Yahoo Mail on Android 
 
 On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni<mmistroni@gmail.com> wrote:  Correct.
I mean reduce the functionality.Uhm I realised I didn't ask u a fundamental question. When
you see the broadcast errors, does your job terminate? Or are you assuming that something
is wrong just because you see the message in the logs?Plus...Wrt logic....Who writes the CSV?
With what frequency?Does it app run all the time loading CSV from hadoop?Are you using spark
streaming?Does it app run fine with an older version of spark (1.6 )Hth
On 30 Dec 2016 12:44 pm, "ayan guha" <guha.ayan@gmail.com> wrote:

@Palash: I think what Macro meant by "reduce functionality" is to reduce scope of your application's
functionality so that you can isolate the issue in certain part(s) of the app...I do not think
he meant "reduce" operation :)
On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <spline_palash@yahoo.com. invalid> wrote:

Hi Marco,
All of your suggestions are highly appreciated, whatever you said so far. I would apply to
implement in my code and let you know. 

Let me answer your query:
What does your program do? 
Palash>> In each hour I am loading many CSV files and then I'm making some KPI(s) out
of them. Finally I am doing some aggregation and inserting into mongodb from spark. 

 you say it runs for 2-3 hours, what is the logic? just processing a huge amount of data?
doing ML ?Palash>> Yes you are right whatever I'm processing it should not take much
time. Initially my processing was taking only 5 minutes as I was using all cores running only
one application. When I created more separate spark applications for handling delayed data
loading and implementing more use cases with parallel run, I started facing the error randomly.
And due to separate resource distribution among four parallel spark application to run in
parallel now some task is taking longer time than usual. But still it should not take 2-3
hours time...

Currently whole applications are running in a development environment where we have only two
VM cluster and I will migrate to production platform by next week. I will let you know if
there is any improvement over there. 

 I'd say break down your application..  reduce functionality , run and see outcome. then
add more functionality, run and see again.
Palash>> Macro as I'm not very good in Spark. It would be helpful for me if you provide
some example of reduce functionality. Cause I'm using Spark data frame, join data frames,
use SQL statement to manipulate KPI(s). Here How could I apply reduce functionality?


 Thanks & Best Regards,
Palash Gupta


      From: Marco Mistroni <mmistroni@gmail.com>
 To: "spline_palash@yahoo.com" <spline_palash@yahoo.com> 
Cc: User <user@spark.apache.org>
 Sent: Thursday, December 29, 2016 11:28 PM
 Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_
piece0 of broadcast_1 in Spark 2.0.0"
   
Hello
 no sorry i dont have any further insight into that.... i have seen similar errors but for
completely different issues, and in most of hte cases it had to do with my data or my processing
rather than Spark itself.
What does your program do? you say it runs for 2-3 hours, what is the logic? just processing
a huge amount of data?
doing ML ?
i'd say break down your application..  reduce functionality , run and see outcome. then add
more functionality, run and see again.
I found myself doing htese kinds of things when i got errors in my spark apps.

To get a concrete help you will have to trim down the code to a few lines that can reproduces
the error  That will be a great start

Sorry for not being of much help

hth
 marco





On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta <spline_palash@yahoo.com> wrote:

Hi Marco,
Thanks for your response.
Yes I tested it before & am able to load from linux filesystem and it also sometimes have
similar issue.
However in both cases (either from hadoop or linux file system), this error comes in some
specific scenario as per my observations:
1. When two parallel spark separate application is initiated from one driver (not all the
time, sometime)2. If one spark jobs are running for more than expected hour let say 2-3 hours,
the second application terminated giving the error.
To debug the problem for me it will be good if you can share some possible reasons why failed
to broadcast error may come.
Or if you need more logs I can share.
Thanks again Spark User Group.
Best RegardsPalash Gupta


Sent from Yahoo Mail on Android 
 
 On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni<mmistroni@gmail.com> wrote:  Hi Pls
try to read a CSV from filesystem instead of hadoop. If you can read it successfully then
your hadoop file is the issue and you can start debugging from there.Hth
On 29 Dec 2016 6:26 am, "Palash Gupta" <spline_palash@yahoo.com. invalid> wrote:

Hi Apache Spark User team,


Greetings!
I started developing an application using Apache Hadoop and Spark using python. My pyspark
application randomly terminated saying "Failed to get broadcast_1*" and I have been searching
for suggestion and support in Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1
in pyspark application

  
|  
|  
|  
|   |    |

  |

  |
|  
|   |  
Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
 I was building an application on Apache Spark 2.00 with Python 3.4 and trying to load some
CSV files from HDFS (...  |   |

  |

  |

 

Could you please provide suggestion registering myself in Apache User list or how can I get
suggestion or support to debug the problem I am facing?

Your response will be highly appreciated. 


 Thanks & Best Regards,
Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494


   
  




   



-- 
Best Regards,
Ayan Guha

  




   

   


   


   
Mime
View raw message