Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D81B200C0E for ; Wed, 18 Jan 2017 03:52:46 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3C2B8160B52; Wed, 18 Jan 2017 02:52:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C2BAF160B46 for ; Wed, 18 Jan 2017 03:52:44 +0100 (CET) Received: (qmail 93529 invoked by uid 500); 18 Jan 2017 02:52:38 -0000 Mailing-List: contact dev-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@spark.apache.org Received: (qmail 93519 invoked by uid 99); 18 Jan 2017 02:52:38 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2017 02:52:38 +0000 Received: from mail-it0-f47.google.com (mail-it0-f47.google.com [209.85.214.47]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1A0631A07C2 for ; Wed, 18 Jan 2017 02:52:37 +0000 (UTC) Received: by mail-it0-f47.google.com with SMTP id r185so2057005ita.0 for ; Tue, 17 Jan 2017 18:52:37 -0800 (PST) X-Gm-Message-State: AIkVDXId3q3QpO8eVKvBOMLh99sXkJWAmDJ53bPgD4FnYKiY3fAqBIK3XHU7hCZlVDjNBpR+cSSxanGdQ2JvBQ== X-Received: by 10.36.17.7 with SMTP id 7mr1468549itf.113.1484707957341; Tue, 17 Jan 2017 18:52:37 -0800 (PST) MIME-Version: 1.0 Received: by 10.36.69.33 with HTTP; Tue, 17 Jan 2017 18:51:56 -0800 (PST) In-Reply-To: <370FC482-EBE4-4E69-AD7D-713A12075377@videoamp.com> References: <370FC482-EBE4-4E69-AD7D-713A12075377@videoamp.com> From: Raju Bairishetti Date: Wed, 18 Jan 2017 10:51:56 +0800 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided To: Michael Allman Cc: dev@spark.apache.org Content-Type: multipart/alternative; boundary=001a11442fb26488b80546558293 archived-at: Wed, 18 Jan 2017 02:52:46 -0000 --001a11442fb26488b80546558293 Content-Type: text/plain; charset=UTF-8 Thanks Michael for the respopnse. On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman wrote: > Hi Raju, > > I'm sorry this isn't working for you. I helped author this functionality > and will try my best to help. > > First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to > false? > I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without setting *spark.sql.hive.convertMetastoreParquet* property. Can you link specifically to the jira issue or spark pr you referred to? > The first thing I would try is setting spark.sql.hive.convertMetastoreParquet > to true. Setting that to false might also explain why you're getting > parquet decode errors. If you're writing your table data with Spark's > parquet file writer and reading with Hive's parquet file reader, there may > be an incompatibility accounting for the decode errors you're seeing. > > https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet setting to true to decoding errors. After reverting this it is fetching all partiitons from the table. Can you reply with your table's Hive metastore schema, including partition > schema? > col1 string col2 string year int month int day int hour int # Partition Information # col_name data_type comment year int month int day int hour int venture string > > Where are the table's files located? > In hadoop. Under some user directory. > If you do a "show partitions ." in the spark-sql shell, > does it show the partitions you expect to see? If not, run "msck repair > table .". > Yes. It is listing the partitions > Cheers, > > Michael > > > On Jan 17, 2017, at 12:02 AM, Raju Bairishetti wrote: > > Had a high level look into the code. Seems getHiveQlPartitions method > from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning > conf value. > > It should not fetch all partitions if we set metastorePartitionPruning to > true (Default value for this is false) > > def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { > val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { > table.getPartitions(predicates) > } else { > allPartitions > } > > ... > > def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = > client.getPartitionsByFilter(this, predicates) > > lazy val allPartitions = table.getAllPartitions > > But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true. > > Am I missing something or looking at wrong place? > > > On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti wrote: > >> Hello, >> >> Spark sql is generating query plan with all partitions information >> even though if we apply filters on partitions in the query. Due to >> this, sparkdriver/hive metastore is hitting with OOM as each table is >> with lots of partitions. >> >> We can confirm from hive audit logs that it tries to >> *fetch all partitions* from hive metastore. >> >> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: HiveMetaStore.audit >> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub ip=/x.x.x.x >> cmd=get_partitions : db=xxxx tbl=xxxxx >> >> >> Configured the following parameters in the spark conf to fix the above >> issue(source: from spark-jira & github pullreq): >> >> *spark.sql.hive.convertMetastoreParquet false* >> * spark.sql.hive.metastorePartitionPruning true* >> >> >> * plan: rdf.explain* >> * == Physical Plan ==* >> HiveTableScan [rejection_reason#626], MetastoreRelation dbname, >> tablename, None, [(year#314 = 2016),(month#315 = 12),(day#316 = >> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >> >> * get_partitions_by_filter* method is called and fetching only >> required partitions. >> >> But we are seeing parquetDecode errors in our applications frequently >> after this. Looks like these decoding errors were because of changing >> serde fromspark-builtin to hive serde. >> >> I feel like,* fixing query plan generation in the spark-sql* is the >> right approach instead of forcing users to use hive serde. >> >> Is there any workaround/way to fix this issue? I would like to hear more >> thoughts on this :) >> >> >> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti >> wrote: >> >>> Had a high level look into the code. Seems getHiveQlPartitions method >>> from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning >>> conf value. >>> >>> It should not fetch all partitions if we set metastorePartitionPruning to >>> true (Default value for this is false) >>> >>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { >>> val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { >>> table.getPartitions(predicates) >>> } else { >>> allPartitions >>> } >>> >>> ... >>> >>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = >>> client.getPartitionsByFilter(this, predicates) >>> >>> lazy val allPartitions = table.getAllPartitions >>> >>> But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true. >>> >>> Am I missing something or looking at wrong place? >>> >>> >>> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti >>> wrote: >>> >>>> Waiting for suggestions/help on this... >>>> >>>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> Spark sql is generating query plan with all partitions information >>>>> even though if we apply filters on partitions in the query. Due to this, >>>>> spark driver/hive metastore is hitting with OOM as each table is with lots >>>>> of partitions. >>>>> >>>>> We can confirm from hive audit logs that it tries to *fetch all >>>>> partitions* from hive metastore. >>>>> >>>>> 2016-12-28 07:18:33,749 INFO [pool-4-thread-184]: >>>>> HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - >>>>> ugi=rajub ip=/x.x.x.x cmd=get_partitions : db=xxxx tbl=xxxxx >>>>> >>>>> >>>>> Configured the following parameters in the spark conf to fix the above >>>>> issue(source: from spark-jira & github pullreq): >>>>> >>>>> *spark.sql.hive.convertMetastoreParquet false* >>>>> * spark.sql.hive.metastorePartitionPruning true* >>>>> >>>>> >>>>> * plan: rdf.explain* >>>>> * == Physical Plan ==* >>>>> HiveTableScan [rejection_reason#626], MetastoreRelation dbname, >>>>> tablename, None, [(year#314 = 2016),(month#315 = 12),(day#316 = >>>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)] >>>>> >>>>> * get_partitions_by_filter* method is called and fetching only >>>>> required partitions. >>>>> >>>>> But we are seeing parquetDecode errors in our applications >>>>> frequently after this. Looks like these decoding errors were because of >>>>> changing serde from spark-builtin to hive serde. >>>>> >>>>> I feel like,* fixing query plan generation in the spark-sql* is the >>>>> right approach instead of forcing users to use hive serde. >>>>> >>>>> Is there any workaround/way to fix this issue? I would like to hear >>>>> more thoughts on this :) >>>>> >>>>> ------ >>>>> Thanks, >>>>> Raju Bairishetti, >>>>> www.lazada.com >>>>> >>>> >>>> >>>> >>>> -- >>>> >>>> ------ >>>> Thanks, >>>> Raju Bairishetti, >>>> www.lazada.com >>>> >>> >>> >>> >>> -- >>> >>> ------ >>> Thanks, >>> Raju Bairishetti, >>> www.lazada.com >>> >> >> >> >> -- >> >> ------ >> Thanks, >> Raju Bairishetti, >> www.lazada.com >> > > > > -- > > ------ > Thanks, > Raju Bairishetti, > www.lazada.com > > > -- ------ Thanks, Raju Bairishetti, www.lazada.com --001a11442fb26488b80546558293 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:4= 5 AM, Michael Allman <michael@videoamp.com> wrote:
Hi Raju,

I'm sorry this isn't working fo= r you. I helped author this functionality and will try my best to help.
=

First, I'm curious why you set spark.sql.hive.= convertMetastoreParquet to false?
I had= set as suggested in=C2=A0SPARK-6910 and correspon= sing pull reqs.=C2=A0It did not work for me=C2=A0without =C2=A0setting=C2=A0spark.sql.hive.convertMetasto= reParquet property.=C2=A0

= Can you link specifically to the jira issue or spark pr you referred to? Th= e first thing I would try is setting=C2=A0spark.sql.hive.convertMetast= oreParquet to true. Setting that to false might also explain why you're= getting parquet decode errors. If you're writing your table data with = Spark's parquet file writer and reading with Hive's parquet file re= ader, there may be an incompatibility accounting for the decode errors you&= #39;re seeing.=C2=A0

=C2= =A0https://iss= ues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid = fetching all the partitions. We reverted spark.sql.hive.convertMetasto= reParquet =C2=A0setting to true to decoding errors. After reverting this it= is fetching all partiitons from the table.

=
Can you reply with your table's Hive metastore sch= ema, including partition schema?
=C2=A0 = =C2=A0 =C2=A0col1 string
=C2=A0 =C2=A0 =C2=A0col2 string
=C2=A0 =C2=A0 =C2=A0year int
=C2=A0 =C2=A0 =C2=A0month int
=C2=A0 =C2=A0 =C2=A0day int
=C2=A0 =C2=A0 =C2=A0hour int = =C2=A0=C2=A0

# Partition Information =C2=A0

# col_name=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 data_= type =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 comment =C2=A0 =C2=A0

year =C2=A0int

month int

day int

hour in= t

venture string

=C2=A0
Where are the table's files located?
In hadoop. Under some user directory.=C2=A0
If you do a "show partitions <dbname>.<tablen= ame>" in the spark-sql shell, does it show the partitions you expec= t to see? If not, run "msck repair table <dbname>.<tablename&= gt;".
Yes. It is listing the partit= ions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <raju@apache.org> wrote= :

=
Had a high level loo= k into the code. Seems=C2=A0getHiveQlPartitions=C2=A0 method from HiveMetastoreCatalog is getti= ng called irrespective of=C2=A0metastorePartitionPruning c= onf value.

=C2=A0It should not fetch=C2= =A0all=C2=A0partitions=C2=A0if= we set=C2=A0metastorePartitionPruning=C2=A0to true (= Default value for this is false)=C2=A0
def getHiveQlPartitio=
ns(predicates: Seq[Expres=
sion] =3D Nil): Seq[Partition] =3D {
= val rawPartition= s =3D if (sqlCon= text.conf.metastorePartitionPruning) {
table.getPartitions(pred= icates)
} els= e {
a= llPartitions
}
...
def g=
etPartitions(predicates: Seq[E=
xpression]): Seq[HivePartition=
] =3D
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions =3D table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?
=


=C2=A0 =C2=A0 get_partitions_by_filter<= /i>=C2=A0method is called and fetching only required=C2=A0partitions.

=C2=A0 =C2=A0 But we are seeing parquetDecode errors = in our applications frequently after this. Looks like these decoding errors= were because of changing serde=C2=A0fromspark-builtin to hive serde.<= /div>

I feel like,=C2=A0fixing=C2=A0query=C2=A0plan=C2=A0g= eneration in the=C2=A0spark-sql=C2=A0is the right approa= ch instead of forcing users to use hive serde.

Is there any workaro= und/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Ba= irishetti <raju@apache.org> wrote:
Had a high level look into the= code. Seems=C2=A0getHiveQlPartitions=C2=A0 method from Hive= MetastoreCatalog is getting called irrespective of=C2=A0metastore= PartitionPruning conf value.

=C2=A0It should = not fetch all partitions if we set=C2=A0metastorePartitionPruning= =C2=A0to true (Default value for this is false)=C2=A0
def getHiveQlPartitions(predicates: <=
span style=3D"color:rgb(32,153,157)">Seq[Expression] =3D Nil): Seq[Partition] =3D {
val rawPartitions =3D if (sqlContext.conf.metastorePa= rtitionPruning) {
table.getPartitions(predicates)
} <= span style=3D"color:rgb(0,0,128);font-weight:bold">else {
allPartitions
}
<= pre style=3D"font-family:menlo;font-size:12pt">...
def getPartiti=
ons(predicates: Seq[Expression=
]): Seq[HivePartition] =3D
= client.get= PartitionsByFilter(this, predicates)
lazy val allPartitions =3D table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting=
 metastorePartitionPruning to true.
Am I missing something or looking at wrong place?
=

On Mon, Jan 16, 2017 at 1= 2:53 PM, Raju Bairishetti <raju@apache.org> wrote:
Waiting for sugg= estions/help on this...=C2=A0

On Wed, Jan 11, 2017 at 12:14 PM, Raju = Bairishetti <raju@apache.org> wrote:
Hello,
=C2=A0 =C2=A0 =C2= =A0=C2=A0
=C2=A0 =C2=A0Spark sql=C2=A0is generating query plan wi= th all partitions information even though if we apply filters on partitions= in the query.=C2=A0 Due to this, spark driver/hive=C2=A0metastore=C2=A0is = hitting with OOM as each table is with lots of partitions.
=

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

<= div>=C2=A02016-12-28 07:18:33,749 INFO =C2=A0[pool-4-thread-184]: HiveMetaS= tore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=3Drajub =C2= =A0 =C2=A0ip=3D/x.x.x.x =C2=A0 cmd=3Dget_partitions : db=3Dxxxx tbl=3Dxxxxx=


Configured the following parameter= s in the spark conf to fix the above issue(source: from spark-jira & gi= thub pullreq):
=C2=A0 =C2=A0 spark.sql.hive.convertMetastoreParquet =C2=A0 false
=C2=A0 =C2=A0 spark.sql.h= ive.metastorePartitionPruning =C2=A0 true

=C2=A0 =C2=A0plan: =C2=A0rdf.explain
=C2=A0 = =C2=A0=3D=3D Physical Plan =3D=3D
=C2=A0 =C2=A0 =C2=A0 =C2=A0= HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, = None, =C2=A0 [(year#314 =3D 2016),(month#315 =3D 12),(day#316 =3D 28),(hour= #317 =3D 2),(venture#318 =3D DEFAULT)]

= =C2=A0 =C2=A0 get_partitions_by_filter method is called and fetching on= ly required partitions.

=C2=A0 =C2=A0 But we a= re seeing parquetDecode errors in our applications frequently after this. L= ooks like these decoding errors were because of changing serde=C2=A0from sp= ark-builtin to hive serde.

I feel like, fixing = query plan generation in the spark-sql is the right approach instead of= forcing users to use hive serde.

Is there any= workaround/way to fix this issue? I would like to hear more thoughts on th= is :)

------
Thanks,
Raju Bairishetti,



--
=
=

------
Thanks,
Raju Bairishet= ti,



--

------
Thanks,
Raju Bairishetti,



--
=

------
Thanks= ,
Raju Bairishetti,



--
=

------
Thanks,
Raju Bai= rishetti,




--

------
Thanks,=
Raju Bairishetti,
--001a11442fb26488b80546558293--