hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HIVE-16046) Broadcasting small table for Hive on Spark
Date Wed, 19 Apr 2017 07:51:41 GMT

    [ https://issues.apache.org/jira/browse/HIVE-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974264#comment-15974264
] 

liyunzhang_intel commented on HIVE-16046:
-----------------------------------------

[~xuefuz]:
in [HIVE-8621|https://issues.apache.org/jira/browse/HIVE-8621?focusedCommentId=14189547&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14189547],
you comment
{quote}
Suhas Satish Thanks for sharing your thoughts and findings. We have been reevaluating Spark's
broadcast variables for the purpose of small tables. Spark's broadcast variable works well
for small amount of data, but memory issues become mounting when broadcasting large amount
of the data. For bucket join, the table to be broadcast isn't necessary small. To make things
worth, Spark needs to keep the variable live at the driver, even after the variable is broadcast.
For this reason, we are considering to use MR's way to broadcast the small tables. I'm working
on a writeup and create subtasks for this piece. Hopefully, we can reuse or clone quite some
amount of code.
{quote}

the reason why not use Spark's broadcast variables is the size of table of bucket join maybe
large and it will require the large memory from driver? If yes, can we only implement map
join by spark broadcasting as [broadcasting|https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/7-Broadcast.md]
shows performance advantage over distribute cache?  Appreciate to get some suggestion.
{quote}
Broadcasting shared variables is a very handy feature. In Hadoop we have the DistributedCache
and it's used in many situations. For example, parameters of -libjars are sent to all nodes
by using DistributedCache. However in Hadoop broadcasted data needs to be uploaded to HDFS
first and it has no mechanism to share data between tasks in the same node. Say if some node
needs to process 4 mappers coming from the same job, then the broadcast variable will be stored
4 times in this node (one copy in each mapper's working directory). An advantage of this approach
is that by using HDFS we won't have the bottleneck problem since HDFS does the job of cutting
data into blocks and distributing them across the cluster.

For Spark, broadcast cares about sending data to all nodes as well as letting tasks of the
same node share data. Spark's block manager solves the problem of sharing data between tasks
in the same node. Storing shared data in local block manager with a storage level at memory
+ disk guarantees that all local tasks can access the shared data, in this way we avoid storing
multiple copies. Spark has 2 broadcast implementations. The traditional HttpBroadcast has
the bottleneck problem around the driver node. TorrentBroadcast solves this problem but it
starts slower since it only accelerate the broadcast after some amount of blocks fetched by
executors. Also in Spark, the reconstitution of original data from data blocks needs some
extra memory space.

In fact Spark also tried an alternative called TreeBroadcast. Interested reader can check
the technical report: Performance and Scalability of Broadcast in Spark.

{quote}

> Broadcasting small table for Hive on Spark
> ------------------------------------------
>
>                 Key: HIVE-16046
>                 URL: https://issues.apache.org/jira/browse/HIVE-16046
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>
> currently the spark plan is 
> {code}
> 1. TS(Small table)->Sel/Fil->HashTableSink  
>                                    
> 2. TS(Small table)->Sel/Fil->HashTableSink          
>                                                                                     
                                  
> 3.                                             HashTableDummy --
>                                                                 |
>                                                 HashTableDummy  --
>                                                                 |
>                                 RootTS(Big table) ->Sel/Fil ->MapJoin -->Sel/Fil
->FileSink
> {code}
> 	1.   Run the small­table SparkWorks on Spark cluster, which dump to hashmap file
> 	2.    Run the SparkWork for the big table on Spark cluster.  Mappers will lookup the
small­table hashmap from the file using HashTableDummy’s loader. 
> The disadvantage of current implementation is it need long time to distribute cache the
hash table if the hash table is large.  Here want to use sparkContext.broadcast() to store
small table although it will keep the broadcast variable in driver and bring some performance
decline on driver.
> [~Fred], [~xuefuz], [~lirui] and [~csun], please give some suggestions on it. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message