spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nirav Patel <npa...@xactlycorp.com>
Subject Re: Saving dataframes with partitionBy: append partitions, overwrite within each
Date Thu, 02 Aug 2018 18:50:48 GMT
I tried following to explicitly specify partition columns in sql statement
and also tried different cases (upper and lower) fro partition columns.

insert overwrite table $tableName PARTITION(P1, P2) select A, B, C, P1, P2
from updateTable.

Still getting:

Caused by:
org.apache.hadoop.hive.ql.metadata.Table$ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition
columns



On Thu, Aug 2, 2018 at 11:37 AM, Nirav Patel <npatel@xactlycorp.com> wrote:

> Thanks Koert. I'll check that out when we can update to 2.3
>
> Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
> overwrite multiple partitions. (without loosing existing ones)
>
> It's giving me issues around partition columns.
>
>     dataFrame.createOrReplaceTempView("updateTable") //here dataframe
> contains values from multiple partitions.
>
> dataFrame also have partition columns but I can't get any of following to
> execute:
>
> insert overwrite table $tableName PARTITION(P1, P2) select * from
> updateTable.
>
> org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.
> metadata.Table.ValidationFailureSemanticException: Partition spec {p1=,
> p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains non-partition columns;
>
>
> Is above a right approach to update multiple partitions? Or should I be
> more specific updating each partition with separate command like following:
>
> //Pseudo code; yet to try
>
> df.createOrReplaceTempView("updateTable")
> df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>
>
>
>   spark.sql("INSERT OVERWRITE TABLE stats
>   PARTITION(P1 = key._1, P2 = key._2)
>   SELECT * from updateTable where P1 = key._1 and P2 = key._2")
> }
>
> Regards,
> Nirav
>
>
> On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> this works for dataframes with spark 2.3 by changing a global setting,
>> and will be configurable per write in 2.4
>> see:
>> https://issues.apache.org/jira/browse/SPARK-20236
>> https://issues.apache.org/jira/browse/SPARK-24860
>>
>> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel <npatel@xactlycorp.com>
>> wrote:
>>
>>> Hi Peay,
>>>
>>> Have you find better solution yet? I am having same issue.
>>>
>>> Following says it works with spark 2.1 onward but only when you use
>>> sqlContext and not Dataframe
>>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>>> using-spark-2e2b818a007a
>>>
>>> Thanks,
>>> Nirav
>>>
>>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh <knoblokh@gmail.com>
>>> wrote:
>>>
>>>> If your processing task inherently processes input data by month you
>>>> may want to "manually" partition the output data by month as well as
>>>> by day, that is to save it with a file name including the given month,
>>>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>>>> overwrite mode with each month partition. Hope this could be of some
>>>> help.
>>>>
>>>> --
>>>> Pavel Knoblokh
>>>>
>>>> On Fri, Sep 29, 2017 at 5:31 PM, peay <peay@protonmail.com> wrote:
>>>> > Hello,
>>>> >
>>>> > I am trying to use
>>>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write
>>>> a
>>>> > dataset while splitting by day.
>>>> >
>>>> > I would like to run a Spark job  to process, e.g., a month:
>>>> > dataset.parquet/day=2017-01-01/...
>>>> > ...
>>>> >
>>>> > and then run another Spark job to add another month using the same
>>>> folder
>>>> > structure, getting me
>>>> > dataset.parquet/day=2017-01-01/
>>>> > ...
>>>> > dataset.parquet/day=2017-02-01/
>>>> > ...
>>>> >
>>>> > However:
>>>> > - with save mode "overwrite", when I process the second month, all of
>>>> > dataset.parquet/ gets removed and I lose whatever was already
>>>> computed for
>>>> > the previous month.
>>>> > - with save mode "append", then I can't get idempotence: if I run the
>>>> job to
>>>> > process a given month twice, I'll get duplicate data in all the
>>>> subfolders
>>>> > for that month.
>>>> >
>>>> > Is there a way to do "append in terms of the subfolders from
>>>> partitionBy,
>>>> > but overwrite within each such partitions? Any help would be
>>>> appreciated.
>>>> >
>>>> > Thanks!
>>>>
>>>>
>>>>
>>>> --
>>>> Pavel Knoblokh
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.instagram.com/xactlycorp/>
>>> <https://www.linkedin.com/company/xactly-corporation>
>>> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
>>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>

Mime
View raw message