spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Allman <mich...@videoamp.com>
Subject Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided
Date Tue, 17 Jan 2017 18:45:47 GMT
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best
to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? Can you link
specifically to the jira issue or spark pr you referred to? The first thing I would try is
setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain
why you're getting parquet decode errors. If you're writing your table data with Spark's parquet
file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting
for the decode errors you're seeing. 

Can you reply with your table's Hive metastore schema, including partition schema? Where are
the table's files located? If you do a "show partitions <dbname>.<tablename>"
in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair
table <dbname>.<tablename>".

Cheers,

Michael


> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <raju@apache.org> wrote:
> 
> Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog
is getting called irrespective of metastorePartitionPruning conf value.
> 
>  It should not fetch all partitions if we set metastorePartitionPruning to true (Default
value for this is false) 
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>     table.getPartitions(predicates)
>   } else {
>     allPartitions
>   }
> ...
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
> lazy val allPartitions = table.getAllPartitions
> But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning
to true.
> Am I missing something or looking at wrong place?
> 
> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <raju@apache.org <mailto:raju@apache.org>>
wrote:
> Hello,
>       
>    Spark sql is generating query plan with all partitions information even though if
we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting
with OOM as each table is with lots of partitions.
> 
> We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.
> 
>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371))
- ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx
> 
> 
> Configured the following parameters in the spark conf to fix the above issue(source:
from spark-jira & github pullreq):
>     spark.sql.hive.convertMetastoreParquet   false
>     spark.sql.hive.metastorePartitionPruning   true
> 
>    plan:  rdf.explain
>    == Physical Plan ==
>        HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,
  [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]
> 
>     get_partitions_by_filter method is called and fetching only required partitions.
> 
>     But we are seeing parquetDecode errors in our applications frequently after this.
Looks like these decoding errors were because of changing serde fromspark-builtin to hive
serde.
> 
> I feel like, fixing query plan generation in the spark-sql is the right approach instead
of forcing users to use hive serde.
> 
> Is there any workaround/way to fix this issue? I would like to hear more thoughts on
this :)
> 
> 
> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <raju@apache.org <mailto:raju@apache.org>>
wrote:
> Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog
is getting called irrespective of metastorePartitionPruning conf value.
> 
>  It should not fetch all partitions if we set metastorePartitionPruning to true (Default
value for this is false) 
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>     table.getPartitions(predicates)
>   } else {
>     allPartitions
>   }
> ...
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
> lazy val allPartitions = table.getAllPartitions
> But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning
to true.
> Am I missing something or looking at wrong place?
> 
> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <raju@apache.org <mailto:raju@apache.org>>
wrote:
> Waiting for suggestions/help on this... 
> 
> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <raju@apache.org <mailto:raju@apache.org>>
wrote:
> Hello,
>       
>    Spark sql is generating query plan with all partitions information even though if
we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is
hitting with OOM as each table is with lots of partitions.
> 
> We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.
> 
>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371))
- ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx
> 
> 
> Configured the following parameters in the spark conf to fix the above issue(source:
from spark-jira & github pullreq):
>     spark.sql.hive.convertMetastoreParquet   false
>     spark.sql.hive.metastorePartitionPruning   true
> 
>    plan:  rdf.explain
>    == Physical Plan ==
>        HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,
  [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]
> 
>     get_partitions_by_filter method is called and fetching only required partitions.
> 
>     But we are seeing parquetDecode errors in our applications frequently after this.
Looks like these decoding errors were because of changing serde from spark-builtin to hive
serde.
> 
> I feel like, fixing query plan generation in the spark-sql is the right approach instead
of forcing users to use hive serde.
> 
> Is there any workaround/way to fix this issue? I would like to hear more thoughts on
this :)
> 
> ------
> Thanks,
> Raju Bairishetti,
> www.lazada.com <http://www.lazada.com/>
> 
> 
> -- 
> 
> ------
> Thanks,
> Raju Bairishetti,
> www.lazada.com <http://www.lazada.com/>
> 
> 
> -- 
> 
> ------
> Thanks,
> Raju Bairishetti,
> www.lazada.com <http://www.lazada.com/>
> 
> 
> -- 
> 
> ------
> Thanks,
> Raju Bairishetti,
> www.lazada.com <http://www.lazada.com/>
> 
> 
> -- 
> 
> ------
> Thanks,
> Raju Bairishetti,
> www.lazada.com <http://www.lazada.com/>

Mime
View raw message