spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Zoltan Ivanfi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-12297) Add work-around for Parquet/Hive int96 timestamp bug.
Date Wed, 10 May 2017 12:34:04 GMT

    [ https://issues.apache.org/jira/browse/SPARK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004607#comment-16004607
] 

Zoltan Ivanfi commented on SPARK-12297:
---------------------------------------

bq. It'd be great to consider this more holistically and think about alternatives in fixing
them

As Ryan mentioned, the Parquet community discussed this timestamp incompatibilty problem with
the aim of avoiding similar problems in the future. It was decided that the specification
needs to include two separate types with well-defined semantics: one for timezone-agnostic
(aka. TIMESTAMP WITHOUT TIMEZONE) and one for UTC-normalized (aka. TIMESTAMP WITH TIMEZONE)
timestamps. (Otherwise implementors would be tempted to misuse the single existing type for
storing timestamps of different semantics, as it already happened with the int96 timestamp
type).

While this is a nice and clean long-term solution, a short-term fix is also desired until
the new types become widely supported and/or to allow dealing with existing data. The commit
in question is a part of this short-term fix and it allows getting correct values when reading
int96 timestamps, even for data written by other components.

bq. it completely changes the behavior of one of the most important data types.

A very important aspect of this fix is that it does not change SparkSQL's behavior unless
the user sets a table property, so it's a completely safe and non-breaking change.

bq. One of the fundamental problem is that Spark treats timestamp as timestamp with timezone,
whereas impala treats timestamp as timestamp without timezone. The parquet storage is only
a small piece here.

The fix only addresses Parquet timestamps indeed. This, however, is intentional and is not
a limitation, neither an inconsistency. The problem in fact is specific to Parquet. For other
file formats (for example CSV or Avro), SparkSQL follows timezone-agnostic (TIMESTAMP WITHOUT
TIMEZONE) semantics. So using UTC-normalized (TIMESTAMP WITH TIMEZONE) semantics in Parquet
is not only incompatible with Impala but is also inconsistent within SparkSQL itself.

bq. Also this is not just a Parquet issue. The same issue could happen to all data formats.
It is going to be really confusing to have something that only works for Parquet

In fact the current behavior of SparkSQL is different for Parquet than for other formats.
The fix allows the user to choose a consistent and less confusing behaviour instead. It also
makes Impala, Hive and SparkSQL compatible with each other regarding int96 timestamps.

bq. It seems like the purpose of this patch can be accomplished by just setting the session
local timezone to UTC?

Unfortunately that would not suffice. The problem has to addressed in all SQL engines. As
of today, Hive and Impala already contains the changes that allow interoperability using the
parquet.mr.int96.write.zone table property:

* Hive:
** https://github.com/apache/hive/commit/84fdc1c7c8ff0922aa44f829dbfa9659935c503e
** https://github.com/apache/hive/commit/a1cbccb8dad1824f978205a1e93ec01e87ed8ed5
** https://github.com/apache/hive/commit/2dfcea5a95b7d623484b8be50755b817fbc91ce0
** https://github.com/apache/hive/commit/78e29fc70dacec498c35dc556dd7403e4c9f48fe
* Impala:
** https://github.com/apache/incubator-impala/commit/5803a0b0744ddaee6830d4a1bc8dba8d3f2caa26


> Add work-around for Parquet/Hive int96 timestamp bug.
> -----------------------------------------------------
>
>                 Key: SPARK-12297
>                 URL: https://issues.apache.org/jira/browse/SPARK-12297
>             Project: Spark
>          Issue Type: Task
>          Components: Spark Core
>            Reporter: Ryan Blue
>
> Spark copied Hive's behavior for parquet, but this was inconsistent with other file formats,
and inconsistent with Impala (which is the original source of putting a timestamp as an int96
in parquet, I believe).  This made timestamps in parquet act more like timestamps with timezones,
while in other file formats, timestamps have no time zone, they are a "floating time".
> The easiest way to see this issue is to write out a table with timestamps in multiple
different formats from one timezone, then try to read them back in another timezone.  Eg.,
here I write out a few timestamps to parquet and textfile hive tables, and also just as a
json file, all in the "America/Los_Angeles" timezone:
> {code}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val tblPrefix = args(0)
> val schema = new StructType().add("ts", TimestampType)
> val rows = sc.parallelize(Seq(
>   "2015-12-31 23:50:59.123",
>   "2015-12-31 22:49:59.123",
>   "2016-01-01 00:39:59.123",
>   "2016-01-01 01:29:59.123"
> ).map { x => Row(java.sql.Timestamp.valueOf(x)) })
> val rawData = spark.createDataFrame(rows, schema).toDF()
> rawData.show()
> Seq("parquet", "textfile").foreach { format =>
>   val tblName = s"${tblPrefix}_$format"
>   spark.sql(s"DROP TABLE IF EXISTS $tblName")
>   spark.sql(
>     raw"""CREATE TABLE $tblName (
>           |  ts timestamp
>           | )
>           | STORED AS $format
>      """.stripMargin)
>   rawData.write.insertInto(tblName)
> }
> rawData.write.json(s"${tblPrefix}_json")
> {code}
> Then I start a spark-shell in "America/New_York" timezone, and read the data back from
each table:
> {code}
> scala> spark.sql("select * from la_parquet").collect().foreach{println}
> [2016-01-01 02:50:59.123]
> [2016-01-01 01:49:59.123]
> [2016-01-01 03:39:59.123]
> [2016-01-01 04:29:59.123]
> scala> spark.sql("select * from la_textfile").collect().foreach{println}
> [2015-12-31 23:50:59.123]
> [2015-12-31 22:49:59.123]
> [2016-01-01 00:39:59.123]
> [2016-01-01 01:29:59.123]
> scala> spark.read.json("la_json").collect().foreach{println}
> [2015-12-31 23:50:59.123]
> [2015-12-31 22:49:59.123]
> [2016-01-01 00:39:59.123]
> [2016-01-01 01:29:59.123]
> scala> spark.read.json("la_json").join(spark.sql("select * from la_textfile"), "ts").show()
> +--------------------+
> |                  ts|
> +--------------------+
> |2015-12-31 23:50:...|
> |2015-12-31 22:49:...|
> |2016-01-01 00:39:...|
> |2016-01-01 01:29:...|
> +--------------------+
> scala> spark.read.json("la_json").join(spark.sql("select * from la_parquet"), "ts").show()
> +---+
> | ts|
> +---+
> +---+
> {code}
> The textfile and json based data shows the same times, and can be joined against each
other, while the times from the parquet data have changed (and obviously joins fail).
> This is a big problem for any organization that may try to read the same data (say in
S3) with clusters in multiple timezones.  It can also be a nasty surprise as an organization
tries to migrate file formats.  Finally, its a source of incompatibility between Hive, Impala,
and Spark.
> HIVE-12767 aims to fix this by introducing a table property which indicates the "storage
timezone" for the table.  Spark should add the same to ensure consistency between file formats,
and with Hive & Impala.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message