spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan <ryan.hd....@gmail.com>
Subject Re: Spark SQL (Pyspark) - Parallel processing of multiple datasets
Date Mon, 17 Apr 2017 06:40:02 GMT
I don't think you can parallel insert into a hive table without dynamic
partition, for hive locking please refer to
https://cwiki.apache.org/confluence/display/Hive/Locking.

Other than that, it should work.

On Mon, Apr 17, 2017 at 6:52 AM, Amol Patil <amol4social@gmail.com> wrote:

> Hi All,
>
> I'm writing generic pyspark program to process multiple datasets using
> Spark SQL. For example Traffic Data, Crime Data, Weather Data. Dataset will
> be in csv format & size may vary from *1 GB* to *10 GB*. Each dataset
> will be available at different timeframe (weekly,monthly,quarterly).
>
> My requirement is to process all the datasets in parallel by triggering
> job only once.
>
> In Current implementation we are using Spark CSV package for reading csv
> files & using python treading mechanism to trigger multiple threads
> ----------------------
> jobs = []
> for dict_key, dict_val in config_dict.items():
> t = threading.Thread(target=task,args=(sqlContext,dict_val))
> jobs.append(t)
> t.start()
>
> for x in jobs:
> x.join()
> -----------------------
> And Defind task mehtod to process each dataset based config values dict
>
> -----------------------------------------
> def task(sqlContex, data_source_dict):
> ..
> ...
> -------------------------------------
>
> task method,
> 1. create dataframe from csv file
> 2. then create temporary table from that dataframe.
> 3. finally it ingest data in to Hive table.
>
> *Issue:*
> 1. If I process two datasets in parallel, one dataset goes through
> successfully but for other dataset I'm getting error "*u'temp_table not
> found*" while ingesting data in to hive table. Its happening consistently
> either with dataset A or Dataset B
> sqlContext.sql('INSERT INTO TABLE '+hivetablename+' SELECT * from
> '+temp_table_name)
>
> I tried below things
> 1. I'm creating dataframe name & temporary tabel name dynamically based in
> dataset name.
> 2. Enabled Spark Dynamic allocation (--conf spark.dynamicAllocation.
> enabled=true)
> 3. Set spark.scheduler.mode to FAIR
>
>
> I appreciate advise on
> 1. Is anything wrong in above implementation?
> 2. Is it good idea to process those big datasets in parallel in one job?
> 3. Any other solution to process multiple datasets in parallel?
>
> Thank you,
> Amol Patil
>

Mime
View raw message