impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [2/5] incubator-impala git commit: IMPALA-2716: Hive/Impala incompatibility for timestamp data in Parquet
Date Thu, 04 May 2017 02:10:36 GMT
IMPALA-2716: Hive/Impala incompatibility for timestamp data in Parquet

Before this change:
Hive adjusts timestamps by subtracting the local time zone's offset
from all values when writing data to Parquet files. Hive is internally
inconsistent because it behaves differently for other file formats. As
a result of this adjustment, Impala may read "incorrect" timestamp
values from Parquet files written by Hive.

After this change:
Impala reads Parquet MR timestamp data and adjusts values using a time
zone from a table property (parquet.mr.int96.write.zone), if set, and
will not adjust it if the property is absent. No adjustment will be
applied to data written by Impala.

New HDFS tables created by Impala using CREATE TABLE and CREATE TABLE
LIKE <file> will set the table property to UTC if the global flag
--set_parquet_mr_int96_write_zone_to_utc_on_new_tables is set to true.

HDFS tables created by Impala using CREATE TABLE LIKE <other table>
will copy the property of the table that is copied.

This change also affects the way Impala deals with
--convert_legacy_hive_parquet_utc_timestamps global flag (introduced
in IMPALA-1658). The flag will be taken into account only if
parquet.mr.int96.write.zone table property is not set and ignored
otherwise.

Change-Id: I3f24525ef45a2814f476bdee76655b30081079d6
Reviewed-on: http://gerrit.cloudera.org:8080/5939
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5803a0b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5803a0b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5803a0b0

Branch: refs/heads/master
Commit: 5803a0b0744ddaee6830d4a1bc8dba8d3f2caa26
Parents: f62e6b2
Author: Attila Jeges <attilaj@cloudera.com>
Authored: Wed Feb 8 19:44:16 2017 +0100
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Tue May 2 20:24:08 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/CMakeLists.txt                |   1 +
 .../benchmarks/convert-timestamp-benchmark.cc   | 163 ++++++++++++++++++
 be/src/exec/hdfs-scan-node-base.cc              |   2 +
 be/src/exec/hdfs-scan-node-base.h               |   6 +
 be/src/exec/parquet-column-readers.cc           |  98 +++++++++--
 be/src/exprs/timestamp-functions.cc             |  43 +++--
 be/src/exprs/timezone_db.h                      |   9 +
 be/src/runtime/timestamp-value.cc               |  34 +++-
 be/src/runtime/timestamp-value.h                |  14 +-
 be/src/service/fe-support.cc                    |  19 +++
 be/src/service/impala-server.cc                 |   4 +
 be/src/util/backend-gflag-util.cc               |   3 +
 common/thrift/BackendGflags.thrift              |   4 +
 common/thrift/PlanNodes.thrift                  |   5 +
 common/thrift/generate_error_codes.py           |   3 +
 .../analysis/AlterTableSetTblProperties.java    |  27 +++
 .../apache/impala/analysis/BaseTableRef.java    |  19 +++
 .../apache/impala/analysis/CreateTableStmt.java |  21 +++
 .../org/apache/impala/catalog/HdfsTable.java    |  16 ++
 .../org/apache/impala/planner/HdfsScanNode.java |   5 +-
 .../apache/impala/service/BackendConfig.java    |   3 +
 .../org/apache/impala/service/FeSupport.java    |  14 ++
 .../apache/impala/analysis/AnalyzeDDLTest.java  |  52 ++++++
 tests/common/impala_test_suite.py               |  23 +++
 .../test_hive_parquet_timestamp_conversion.py   | 170 +++++++++++++++++--
 tests/metadata/test_ddl.py                      |   8 +-
 tests/metadata/test_ddl_base.py                 |  23 ---
 .../test_parquet_timestamp_compatibility.py     | 135 +++++++++++++++
 28 files changed, 850 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/benchmarks/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt
index e954583..1739375 100644
--- a/be/src/benchmarks/CMakeLists.txt
+++ b/be/src/benchmarks/CMakeLists.txt
@@ -56,5 +56,6 @@ ADD_BE_BENCHMARK(string-compare-benchmark)
 ADD_BE_BENCHMARK(string-search-benchmark)
 ADD_BE_BENCHMARK(thread-create-benchmark)
 ADD_BE_BENCHMARK(tuple-layout-benchmark)
+ADD_BE_BENCHMARK(convert-timestamp-benchmark)
 
 target_link_libraries(hash-benchmark Experiments)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/benchmarks/convert-timestamp-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/convert-timestamp-benchmark.cc b/be/src/benchmarks/convert-timestamp-benchmark.cc
new file mode 100644
index 0000000..6ba7183
--- /dev/null
+++ b/be/src/benchmarks/convert-timestamp-benchmark.cc
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <time.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <iostream>
+#include <vector>
+#include <sstream>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/date_time/local_time/local_time.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include "exprs/timezone_db.h"
+#include "runtime/timestamp-parse-util.h"
+#include "runtime/timestamp-value.h"
+#include "util/benchmark.h"
+#include "util/cpu-info.h"
+#include "util/pretty-printer.h"
+#include "util/stopwatch.h"
+
+#include "common/names.h"
+
+namespace gregorian = boost::gregorian;
+using boost::posix_time::duration_from_string;
+using boost::posix_time::hours;
+using boost::posix_time::nanoseconds;
+using boost::posix_time::ptime;
+using boost::posix_time::time_duration;
+using boost::posix_time::to_iso_extended_string;
+using boost::posix_time::to_simple_string;
+using boost::local_time::time_zone_ptr;
+using boost::local_time::posix_time_zone;
+using namespace impala;
+
+// Benchmark for converting timestamps from UTC to local time and from UTC to a given time
+// zone.
+// Machine Info: Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz
+// ConvertTimestamp:  Function  10%ile  50%ile  90%ile     10%ile     50%ile     90%ile
+//                                                     (relative) (relative) (relative)
+// ------------------------------------------------------------------------------------
+//                     FromUtc  0.0147  0.0152  0.0155         1X         1X         1X
+//                  UtcToLocal  0.0216  0.0228  0.0234      1.47X       1.5X      1.51X
+
+time_zone_ptr LOCAL_TZ;
+
+struct TestData {
+  vector<TimestampValue> data;
+  vector<TimestampValue> result;
+};
+
+void AddTestDataDateTimes(TestData* data, int n, const string& startstr) {
+  DateTimeFormatContext dt_ctx;
+  dt_ctx.Reset("yyyy-MMM-dd HH:mm:ss", 19);
+  TimestampParser::ParseFormatTokens(&dt_ctx);
+
+  ptime start(boost::posix_time::time_from_string(startstr));
+  for (int i = 0; i < n; ++i) {
+    int val = rand();
+    start += gregorian::date_duration(rand() % 100);
+    start += nanoseconds(val);
+    stringstream ss;
+    ss << to_simple_string(start);
+    string ts = ss.str();
+    data->data.push_back(TimestampValue(ts.c_str(), ts.size(), dt_ctx));
+  }
+}
+
+void TestFromUtc(int batch_size, void* d) {
+  TestData* data = reinterpret_cast<TestData*>(d);
+  for (int i = 0; i < batch_size; ++i) {
+    int n = data->data.size();
+    for (int j = 0; j < n; ++j) {
+      TimestampValue ts = data->data[j];
+      ts.FromUtc(LOCAL_TZ);
+      data->result[j] = ts;
+    }
+  }
+}
+
+void TestUtcToLocal(int batch_size, void* d) {
+  TestData* data = reinterpret_cast<TestData*>(d);
+  for (int i = 0; i < batch_size; ++i) {
+    int n = data->data.size();
+    for (int j = 0; j < n; ++j) {
+      TimestampValue ts = data->data[j];
+      ts.UtcToLocal();
+      data->result[j] = ts;
+    }
+  }
+}
+
+int main(int argc, char **argv) {
+  CpuInfo::Init();
+  cout << Benchmark::GetMachineInfo() << endl;
+
+  TimestampParser::Init();
+  ABORT_IF_ERROR(TimezoneDatabase::Initialize());
+
+  // Initialize LOCAL_TZ to local time zone
+  tzset();
+  time_t now = time(0);
+  LOCAL_TZ = time_zone_ptr(new posix_time_zone(tzname[localtime(&now)->tm_isdst]));
+
+  TestData datetimes;
+  AddTestDataDateTimes(&datetimes, 10000, "1953-04-22 01:02:03");
+  datetimes.result.resize(datetimes.data.size());
+
+  Benchmark timestamp_suite("ConvertTimestamp");
+  timestamp_suite.AddBenchmark("FromUtc", TestFromUtc, &datetimes);
+  timestamp_suite.AddBenchmark("UtcToLocal", TestUtcToLocal, &datetimes);
+
+  cout << timestamp_suite.Measure() << endl;
+
+  // If number of threads is specified, run multithreaded tests.
+  int num_of_threads = (argc < 2) ? 0 : atoi(argv[1]);
+  if (num_of_threads >= 1) {
+    vector<boost::shared_ptr<boost::thread> > threads(num_of_threads);
+    StopWatch sw;
+    // Test UtcToLocal()
+    sw.Start();
+    for (auto& t: threads) {
+      t = boost::shared_ptr<boost::thread>(
+          new boost::thread(TestUtcToLocal, 100, &datetimes));
+    }
+    for (auto& t: threads) t->join();
+    uint64_t utc_to_local_elapsed_time = sw.ElapsedTime();
+    sw.Stop();
+
+    // Test FromUtc()
+    sw.Start();
+    for (auto& t: threads) {
+      t = boost::shared_ptr<boost::thread>(
+          new boost::thread(TestFromUtc, 100, &datetimes));
+    }
+    for (auto& t: threads) t->join();
+    uint64_t from_utc_elapsed_time = sw.ElapsedTime();
+    sw.Stop();
+
+    cout << "Number of threads: " << num_of_threads << endl
+         << "TestFromUtc: "
+         << PrettyPrinter::Print(from_utc_elapsed_time, TUnit::CPU_TICKS) << endl
+         << "TestUtcToLocal: "
+         << PrettyPrinter::Print(utc_to_local_elapsed_time, TUnit::CPU_TICKS) << endl;
+  }
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 24802e3..b22e67c 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -88,6 +88,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
       min_max_tuple_desc_(nullptr),
       skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
           tnode.hdfs_scan_node.skip_header_line_count : 0),
+      parquet_mr_write_zone_(tnode.hdfs_scan_node.__isset.parquet_mr_write_zone ?
+          tnode.hdfs_scan_node.parquet_mr_write_zone : ""),
       tuple_id_(tnode.hdfs_scan_node.tuple_id),
       reader_context_(NULL),
       tuple_desc_(NULL),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 1711bb5..3d97e2e 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -153,6 +153,7 @@ class HdfsScanNodeBase : public ScanNode {
   const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); }
   RuntimeState* runtime_state() { return runtime_state_; }
   int skip_header_line_count() const { return skip_header_line_count_; }
+  const std::string& parquet_mr_write_zone() const { return parquet_mr_write_zone_; }
   DiskIoRequestContext* reader_context() { return reader_context_; }
 
   typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap;
@@ -312,6 +313,11 @@ class HdfsScanNodeBase : public ScanNode {
   // to values > 0 for hdfs text files.
   const int skip_header_line_count_;
 
+  // Time zone for adjusting timestamp values read from Parquet files written by
+  // parquet-mr. If conversion should not occur, this is set to an empty string. Otherwise
+  // FE guarantees that this is a valid time zone.
+  const std::string parquet_mr_write_zone_;
+
   /// Tuple id resolved in Prepare() to set tuple_desc_
   const int tuple_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index f92de48..53848e2 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -27,6 +27,7 @@
 #include "exec/parquet-metadata-utils.h"
 #include "exec/parquet-scratch-tuple-batch.h"
 #include "exec/read-write-util.h"
+#include "exprs/timezone_db.h"
 #include "rpc/thrift-util.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/tuple-row.h"
@@ -206,7 +207,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : BaseScalarColumnReader(parent, node, slot_desc),
-      dict_decoder_init_(false) {
+      dict_decoder_init_(false),
+      timezone_(NULL),
+      is_timestamp_dependent_timezone_(false) {
     if (!MATERIALIZED) {
       // We're not materializing any values, just counting them. No need (or ability) to
       // initialize state used to materialize values.
@@ -223,12 +226,25 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     } else {
       fixed_len_size_ = -1;
     }
-    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
-        // TODO: Add logic to detect file versions that have unconverted TIMESTAMP
-        // values. Currently all versions have converted values.
-        (FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
-        slot_desc_->type().type == TYPE_TIMESTAMP &&
-        parent->file_version_.application == "parquet-mr");
+    needs_conversion_ = slot_desc_->type().type == TYPE_CHAR || (
+          slot_desc_->type().type == TYPE_TIMESTAMP &&
+          parent->file_version_.application == "parquet-mr" &&
+          parent_->scan_node_->parquet_mr_write_zone() != "UTC" && (
+            !parent_->scan_node_->parquet_mr_write_zone().empty() ||
+            FLAGS_convert_legacy_hive_parquet_utc_timestamps
+          )
+        );
+
+    if (needs_conversion_ && slot_desc_->type().type == TYPE_TIMESTAMP &&
+        !parent_->scan_node_->parquet_mr_write_zone().empty()) {
+      is_timestamp_dependent_timezone_ = TimezoneDatabase::IsTimestampDependentTimezone(
+          parent_->scan_node_->parquet_mr_write_zone());
+      if (!is_timestamp_dependent_timezone_) {
+        timezone_ = TimezoneDatabase::FindTimezone(
+            parent_->scan_node_->parquet_mr_write_zone());
+      }
+      DCHECK_EQ(is_timestamp_dependent_timezone_, (timezone_ == NULL));
+    }
   }
 
   virtual ~ScalarColumnReader() { }
@@ -536,6 +552,16 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// true if decoded values must be converted before being written to an output tuple.
   bool needs_conversion_;
 
+  /// Used to cache the timezone object corresponding to the "parquet.mr.int96.write.zone"
+  /// table property to avoid repeated calls to TimezoneDatabase::FindTimezone(). Set to
+  /// NULL if the table property is not set, or if it is set to UTC or to a timestamp
+  /// dependent timezone.
+  boost::local_time::time_zone_ptr timezone_;
+
+  /// true if "parquet.mr.int96.write.zone" table property is set to a timestamp dependent
+  /// timezone.
+  bool is_timestamp_dependent_timezone_;
+
   /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
   /// the max length for VARCHAR columns. Unused otherwise.
   int fixed_len_size_;
@@ -579,13 +605,63 @@ inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversionInline() co
   return needs_conversion_;
 }
 
+/// Sets timestamp conversion error message in 'scanner_status'. Returns false if the
+/// execution should be aborted, otherwise returns true.
+bool __attribute__((noinline)) SetTimestampConversionError(HdfsScanNodeBase* scan_node,
+    RuntimeState* scanner_state, const TimestampValue* tv, const string& timezone,
+    const string& detail, Status* scanner_status) {
+  ErrorMsg msg(TErrorCode::PARQUET_MR_TIMESTAMP_CONVERSION_FAILED, tv->DebugString(),
+      timezone, scan_node->hdfs_table()->fully_qualified_name());
+  if (!detail.empty()) msg.AddDetail(detail);
+  Status status = scanner_state->LogOrReturnError(msg);
+  if (!status.ok()) {
+    *scanner_status = status;
+    return false;
+  }
+  return true;
+}
+
 template<>
 bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
-    const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
-  // Conversion should only happen when this flag is enabled.
-  DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
+const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
+  // Conversion should only happen when "parquet.mr.int96.write.zone" table property is
+  // not set to "UTC"
+  DCHECK_NE(parent_->scan_node_->parquet_mr_write_zone(), "UTC");
+
   *dst = *src;
-  if (dst->HasDateAndTime()) dst->UtcToLocal();
+  if (LIKELY(dst->HasDateAndTime())) {
+    if (LIKELY(timezone_ != NULL)) {
+      // Not a timestamp specific timezone. Convert timestamp to the timezone object
+      // cached in timezone_.
+      if (UNLIKELY(!dst->FromUtc(timezone_))) {
+        if (!SetTimestampConversionError(parent_->scan_node_, parent_->state_,
+            src, parent_->scan_node_->parquet_mr_write_zone(), "",
+            &parent_->parse_status_)) {
+          return false;
+        }
+      }
+    } else if (UNLIKELY(is_timestamp_dependent_timezone_)) {
+      // Timestamp specific timezone (such as Moscow pre 2011).
+      // Call timestamp conversion function with the timezone string.
+      if (UNLIKELY(!dst->FromUtc(parent_->scan_node_->parquet_mr_write_zone()))) {
+        if (!SetTimestampConversionError(parent_->scan_node_, parent_->state_,
+            src, parent_->scan_node_->parquet_mr_write_zone(), "",
+            &parent_->parse_status_)) {
+          return false;
+        }
+      }
+    } else {
+      DCHECK(parent_->scan_node_->parquet_mr_write_zone().empty());
+      DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
+      Status s = dst->UtcToLocal();
+      if (UNLIKELY(!s.ok())) {
+        if (!SetTimestampConversionError(parent_->scan_node_, parent_->state_,
+            src, "localtime", s.GetDetail(), &parent_->parse_status_)) {
+          return false;
+        }
+      }
+    }
+  }
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/exprs/timestamp-functions.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/timestamp-functions.cc b/be/src/exprs/timestamp-functions.cc
index 89bb498..b2a33d4 100644
--- a/be/src/exprs/timestamp-functions.cc
+++ b/be/src/exprs/timestamp-functions.cc
@@ -196,8 +196,32 @@ void TimestampFunctions::UnixAndFromUnixClose(FunctionContext* context,
   }
 }
 
+time_zone_ptr TimezoneDatabase::FindTimezone(const string& tz) {
+  if (tz.empty()) return NULL;
+
+  // Look up time zone in 'tz_database' by region.
+  time_zone_ptr tzp = tz_database_.time_zone_from_region(tz);
+  if (tzp != NULL) return tzp;
+
+  // Look up time zone in 'tz_database' by name variations. The following name variations
+  // are considered:
+  // - daylight savings abbreviation
+  // - standard abbreviation
+  // - daylight savings name
+  // - standard name
+  for (const string& tz_region: tz_region_list_) {
+    time_zone_ptr tzp = tz_database_.time_zone_from_region(tz_region);
+    DCHECK(tzp != NULL);
+    if (tzp->dst_zone_abbrev() == tz) return tzp;
+    if (tzp->std_zone_abbrev() == tz) return tzp;
+    if (tzp->dst_zone_name() == tz) return tzp;
+    if (tzp->std_zone_name() == tz) return tzp;
+  }
+  return NULL;
+}
+
 time_zone_ptr TimezoneDatabase::FindTimezone(
-    const string& tz, const TimestampValue& tv, bool tv_in_utc) {
+        const string& tz, const TimestampValue& tv, bool tv_in_utc) {
   // The backing database does not handle timezone rule changes.
   if (iequals("Europe/Moscow", tz) || iequals("Moscow", tz) || iequals("MSK", tz)) {
     if (tv.date().year() < 2011 || (tv.date().year() == 2011 && tv.date().month() < 4)) {
@@ -228,20 +252,11 @@ time_zone_ptr TimezoneDatabase::FindTimezone(
     }
   }
 
-  // See if they specified a zone id
-  time_zone_ptr tzp = tz_database_.time_zone_from_region(tz);
-  if (tzp != NULL) return tzp;
+  return FindTimezone(tz);
+}
 
-  for (vector<string>::const_iterator iter = tz_region_list_.begin();
-       iter != tz_region_list_.end(); ++iter) {
-    time_zone_ptr tzp = tz_database_.time_zone_from_region(*iter);
-    DCHECK(tzp != NULL);
-    if (tzp->dst_zone_abbrev() == tz) return tzp;
-    if (tzp->std_zone_abbrev() == tz) return tzp;
-    if (tzp->dst_zone_name() == tz) return tzp;
-    if (tzp->std_zone_name() == tz) return tzp;
-  }
-  return time_zone_ptr();
+bool TimezoneDatabase::IsTimestampDependentTimezone(const string& tz) {
+    return iequals("Europe/Moscow", tz) || iequals("Moscow", tz) || iequals("MSK", tz);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/exprs/timezone_db.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/timezone_db.h b/be/src/exprs/timezone_db.h
index 3a1178a..9a9b14f 100644
--- a/be/src/exprs/timezone_db.h
+++ b/be/src/exprs/timezone_db.h
@@ -42,6 +42,15 @@ class TimezoneDatabase {
   static boost::local_time::time_zone_ptr FindTimezone(const std::string& tz,
       const TimestampValue& tv, bool tv_in_utc);
 
+  /// Converts the name of a timezone to a boost timezone object without taking into
+  /// account the timestamp. May not work correctly when IsTimestampDependentTimezone(tz)
+  /// is true, e.g. Moscow timezone.
+  /// If 'tz' is not found in the database, nullptr is returned.
+  static boost::local_time::time_zone_ptr FindTimezone(const std::string& tz);
+
+  /// Returns true if 'tz' specifies a timezone that was changed in the past.
+  static bool IsTimestampDependentTimezone(const std::string& tz);
+
   /// Moscow timezone UTC+3 with DST, for use before March 27, 2011.
   static const boost::local_time::time_zone_ptr TIMEZONE_MSK_PRE_2011_DST;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/runtime/timestamp-value.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.cc b/be/src/runtime/timestamp-value.cc
index 72be7fd..5d1c8b9 100644
--- a/be/src/runtime/timestamp-value.cc
+++ b/be/src/runtime/timestamp-value.cc
@@ -16,6 +16,8 @@
 // under the License.
 
 #include "runtime/timestamp-value.h"
+#include "exprs/timestamp-functions.h"
+#include "exprs/timezone_db.h"
 
 #include <boost/date_time/posix_time/posix_time.hpp>
 
@@ -26,6 +28,8 @@
 using boost::date_time::not_a_date_time;
 using boost::gregorian::date;
 using boost::gregorian::date_duration;
+using boost::local_time::local_date_time;
+using boost::local_time::time_zone_ptr;
 using boost::posix_time::nanoseconds;
 using boost::posix_time::ptime;
 using boost::posix_time::ptime_from_tm;
@@ -68,7 +72,7 @@ int TimestampValue::Format(const DateTimeFormatContext& dt_ctx, int len, char* b
   return TimestampParser::Format(dt_ctx, date_, time_, len, buff);
 }
 
-void TimestampValue::UtcToLocal() {
+Status TimestampValue::UtcToLocal() {
   DCHECK(HasDateAndTime());
   // Previously, conversion was done using boost functions but it was found to be
   // too slow. Doing the conversion without function calls (which also avoids some
@@ -83,7 +87,7 @@ void TimestampValue::UtcToLocal() {
     tm temp;
     if (UNLIKELY(NULL == localtime_r(&utc, &temp))) {
       *this = ptime(not_a_date_time);
-      return;
+      return Status("Failed to convert timestamp to local time.");
     }
     // Unlikely but a time zone conversion may push the value over the min/max
     // boundary resulting in an exception.
@@ -93,9 +97,33 @@ void TimestampValue::UtcToLocal() {
         static_cast<unsigned short>(temp.tm_mday));
     time_ = time_duration(temp.tm_hour, temp.tm_min, temp.tm_sec,
         time().fractional_seconds());
-  } catch (std::exception& /* from Boost */) {
+  } catch (std::exception& from_boost) {
+    Status s("Failed to convert timestamp to local time.");
+    s.AddDetail(from_boost.what());
     *this = ptime(not_a_date_time);
+    return s;
   }
+  return Status::OK();
+}
+
+bool TimestampValue::FromUtc(const std::string& timezone_str) {
+  DCHECK(HasDateAndTime());
+  time_zone_ptr timezone = TimezoneDatabase::FindTimezone(timezone_str, *this, true);
+  if (UNLIKELY(timezone == NULL)) {
+    *this = ptime(not_a_date_time);
+    return false;
+  }
+  return FromUtc(timezone);
+}
+
+bool TimestampValue::FromUtc(time_zone_ptr timezone) {
+  DCHECK(HasDateAndTime());
+  DCHECK(timezone != NULL);
+  ptime temp;
+  ToPtime(&temp);
+  local_date_time lt(temp, timezone);
+  *this = lt.local_time();
+  return true;
 }
 
 ostream& operator<<(ostream& os, const TimestampValue& timestamp_value) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/runtime/timestamp-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 5882928..bdd92d5 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -21,12 +21,14 @@
 
 #include <boost/date_time/compiler_config.hpp>
 #include <boost/date_time/gregorian/gregorian.hpp>
+#include <boost/date_time/local_time/local_time.hpp>
 #include <boost/date_time/posix_time/conversion.hpp>
 #include <boost/date_time/posix_time/posix_time_types.hpp>
 #include <ctime>
 #include <gflags/gflags.h>
 #include <string>
 
+#include "common/status.h"
 #include "udf/udf.h"
 #include "util/hash-util.h"
 
@@ -194,8 +196,16 @@ class TimestampValue {
   }
 
   /// Converts from UTC to local time in-place. The caller must ensure the TimestampValue
-  /// this function is called upon has both a valid date and time.
-  void UtcToLocal();
+  /// this function is called upon has both a valid date and time. Returns Status::OK() if
+  /// conversion was successfull and an error Status otherwise. If conversion failed *this
+  /// is set to a ptime object initialized to not_a_date_time.
+  Status UtcToLocal();
+
+  /// Converts from UTC to given timezone in-place. Returns true if conversion was
+  /// successfull and false otherwise. If conversion failed *this is set to a ptime object
+  /// initialized to not_a_date_time.
+  bool FromUtc(const std::string& timezone_str);
+  bool FromUtc(boost::local_time::time_zone_ptr timezone);
 
   void set_date(const boost::gregorian::date d) { date_ = d; }
   void set_time(const boost::posix_time::time_duration t) { time_ = t; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 925a59c..8f52352 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -28,6 +28,8 @@
 #include "exec/catalog-op-executor.h"
 #include "exprs/expr-context.h"
 #include "exprs/expr.h"
+#include "exprs/timestamp-functions.h"
+#include "exprs/timezone_db.h"
 #include "gen-cpp/Data_types.h"
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/jni-thrift-util.h"
@@ -37,6 +39,7 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
 #include "runtime/runtime-state.h"
+#include "runtime/timestamp-value.h"
 #include "service/impala-server.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -66,6 +69,7 @@ Java_org_apache_impala_service_FeSupport_NativeFeTestInit(
   // Init the JVM to load the classes in JniUtil that are needed for returning
   // exceptions to the FE.
   InitCommonRuntime(1, &name, true, TestInfo::FE_TEST);
+  ABORT_IF_ERROR(TimezoneDatabase::Initialize());
   LlvmCodeGen::InitializeLlvm(true);
   ExecEnv* exec_env = new ExecEnv(); // This also caches it from the process.
   exec_env->InitForFeTests();
@@ -346,6 +350,17 @@ Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad(
   return result_bytes;
 }
 
+// Used to call native code from the FE to check if a timezone string is valid or not.
+extern "C"
+JNIEXPORT jboolean JNICALL
+Java_org_apache_impala_service_FeSupport_NativeCheckIsValidTimeZone(
+    JNIEnv* env, jclass caller_class, jstring timezone) {
+  const char *tz = env->GetStringUTFChars(timezone, NULL);
+  jboolean tz_found = tz != NULL && TimezoneDatabase::FindTimezone(tz) != NULL;
+  env->ReleaseStringUTFChars(timezone, tz);
+  return tz_found;
+}
+
 namespace impala {
 
 static JNINativeMethod native_methods[] = {
@@ -369,6 +384,10 @@ static JNINativeMethod native_methods[] = {
     (char*)"NativePrioritizeLoad", (char*)"([B)[B",
     (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad
   },
+  {
+    (char*)"NativeCheckIsValidTimeZone", (char*)"(Ljava/lang/String;)Z",
+    (void*)::Java_org_apache_impala_service_FeSupport_NativeCheckIsValidTimeZone
+  }
 };
 
 void InitFeSupport(bool disable_codegen) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 86cb0c9..6132918 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -120,6 +120,10 @@ DEFINE_string(default_query_options, "", "key=value pair of default query option
 DEFINE_int32(query_log_size, 25, "Number of queries to retain in the query log. If -1, "
     "the query log has unbounded size.");
 DEFINE_bool(log_query_to_file, true, "if true, logs completed query profiles to file.");
+DEFINE_bool(set_parquet_mr_int96_write_zone_to_utc_on_new_tables, false, "if true, sets "
+    "the parquet.mr.int96.write.zone table property to UTC for new tables created using "
+    "CREATE TABLE and CREATE TABLE LIKE <file>. You can find details in the "
+    "documentation.");
 
 DEFINE_int64(max_result_cache_size, 100000L, "Maximum number of query results a client "
     "may request to be cached on a per-query basis to support restarting fetches. This "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 0781393..7b2cdd9 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -26,6 +26,7 @@
 // Configs for the Frontend and the Catalog.
 DECLARE_bool(load_catalog_in_background);
 DECLARE_bool(load_auth_to_local_rules);
+DECLARE_bool(set_parquet_mr_int96_write_zone_to_utc_on_new_tables);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(read_size);
 DECLARE_int32(num_metadata_loading_threads);
@@ -51,6 +52,8 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   TBackendGflags cfg;
   cfg.__set_authorization_policy_file(FLAGS_authorization_policy_file);
   cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background);
+  cfg.__set_set_parquet_mr_int96_write_zone_to_utc_on_new_tables(
+      FLAGS_set_parquet_mr_int96_write_zone_to_utc_on_new_tables);
   cfg.__set_server_name(FLAGS_server_name);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   cfg.__set_authorization_policy_provider_class(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 09cf6f7..edf7422 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -54,4 +54,8 @@ struct TBackendGflags {
   16: required i32 kudu_operation_timeout_ms
 
   17: required i32 initial_hms_cnxn_timeout_s
+
+  // If true, new HDFS tables created using CREATE TABLE and CREATE TABLE LIKE <file>
+  // regardless of format will have the "parquet.mr.int96.write.zone" property set to UTC.
+  18: required bool set_parquet_mr_int96_write_zone_to_utc_on_new_tables
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index b83b84d..1baffc0 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -215,6 +215,11 @@ struct THdfsScanNode {
   // Map from SlotIds to the indices in TPlanNode.conjuncts that are eligible
   // for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
+
+  // Specifies a time zone for adjusting timestamp values read from Parquet files written
+  // by parquet-mr. The actual value comes from "parquet.mr.int96.write.zone" table
+  // property. This is used for a Hive compatibilty fix.
+  10: optional string parquet_mr_write_zone
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 33bae25..84a0dad 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -315,6 +315,9 @@ error_codes = (
 
   ("SCRATCH_READ_TRUNCATED", 102, "Error reading $0 bytes from scratch file '$1' at "
    "offset $2: could only read $3 bytes"),
+
+  ("PARQUET_MR_TIMESTAMP_CONVERSION_FAILED", 103, "Failed to convert timestamp '$0' to "
+   "timezone '$1' for a Parquet file in table '$2'."),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 08007b3..f0e8f11 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
@@ -99,6 +100,9 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
 
     // Analyze 'skip.header.line.format' property.
     analyzeSkipHeaderLineCount(getTargetTable(), tblProperties_);
+
+    // Analyze 'parquet.mr.int96.write.zone'
+    analyzeParquetMrWriteZone(getTargetTable(), tblProperties_);
   }
 
   /**
@@ -155,4 +159,27 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
       if (error.length() > 0) throw new AnalysisException(error.toString());
     }
   }
+
+  /**
+   * Analyze the 'parquet.mr.int96.write.zone' property to make sure it is set to a valid
+   * value. It is looked up in 'tblProperties', which must not be null. If 'table' is not
+   * null, then the method ensures that 'parquet.mr.int96.write.zone' is supported for its
+   * table type. If it is null, then this check is omitted.
+   */
+  private static void analyzeParquetMrWriteZone(Table table,
+      Map<String, String> tblProperties) throws AnalysisException {
+    if (tblProperties.containsKey(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE)) {
+      if (table != null && !(table instanceof HdfsTable)) {
+        throw new AnalysisException(String.format(
+            "Table property '%s' is only supported for HDFS tables.",
+            HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE));
+      }
+      String timezone = tblProperties.get(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE);
+      if (!FeSupport.CheckIsValidTimeZone(timezone)) {
+        throw new AnalysisException(String.format(
+            "Invalid time zone in the '%s' table property: %s",
+            HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, timezone));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
index 1691315..2350398 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
@@ -20,6 +20,8 @@ package org.apache.impala.analysis;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.service.FeSupport;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -64,6 +66,7 @@ public class BaseTableRef extends TableRef {
     analyzeHints(analyzer);
     analyzeJoin(analyzer);
     analyzeSkipHeaderLineCount();
+    analyzeParquetMrWriteZone();
   }
 
   @Override
@@ -95,4 +98,20 @@ public class BaseTableRef extends TableRef {
     hdfsTable.parseSkipHeaderLineCount(error);
     if (error.length() > 0) throw new AnalysisException(error.toString());
   }
+
+  /**
+   * Analyze the 'parquet.mr.int96.write.zone' property.
+   */
+  private void analyzeParquetMrWriteZone() throws AnalysisException {
+    Table table = getTable();
+    if (!(table instanceof HdfsTable)) return;
+    HdfsTable hdfsTable = (HdfsTable)table;
+
+    String timezone = hdfsTable.getParquetMrWriteZone();
+    if (timezone != null && !FeSupport.CheckIsValidTimeZone(timezone)) {
+      throw new AnalysisException(String.format(
+          "Invalid time zone in the '%s' table property: %s",
+          HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, timezone));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 1139005..8ee7927 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -23,10 +23,13 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
 import org.apache.impala.authorization.PrivilegeRequestBuilder;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TTableName;
@@ -176,6 +179,24 @@ public class CreateTableStmt extends StatementBase {
       }
       AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
     }
+
+    if (getTblProperties().containsKey(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE)) {
+      if (getFileFormat() == THdfsFileFormat.KUDU) {
+        throw new AnalysisException(String.format(
+            "Table property '%s' is only supported for HDFS tables.",
+            HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE));
+      }
+      String timezone = getTblProperties().get(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE);
+      if (!FeSupport.CheckIsValidTimeZone(timezone)) {
+        throw new AnalysisException(String.format(
+            "Invalid time zone in the '%s' table property: %s",
+            HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, timezone));
+      }
+    } else if (BackendConfig.INSTANCE.isSetParquetMrWriteZoneToUtcOnNewTables()) {
+      if (getFileFormat() != THdfsFileFormat.KUDU) {
+        getTblProperties().put(HdfsTable.TBL_PROP_PARQUET_MR_WRITE_ZONE, "UTC");
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 143e2b1..b6df167 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -115,6 +115,10 @@ public class HdfsTable extends Table {
   // Table property key for skip.header.line.count
   public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
 
+  // Table property key for parquet.mr.int96.write.zone
+  public static final String TBL_PROP_PARQUET_MR_WRITE_ZONE =
+      "parquet.mr.int96.write.zone";
+
   // An invalid network address, which will always be treated as remote.
   private final static TNetworkAddress REMOTE_NETWORK_ADDRESS =
       new TNetworkAddress("remote*addr", 0);
@@ -1377,6 +1381,18 @@ public class HdfsTable extends Table {
   }
 
   /**
+   * Returns the value of the 'parquet.mr.int96.write.zone' table property. If the value
+   * is not set for the table, returns null.
+   */
+  public String getParquetMrWriteZone() {
+    org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
+    if (msTbl == null) return null;
+    Map<String, String> tblProperties = msTbl.getParameters();
+    if (tblProperties == null) return null;
+    return tblProperties.get(TBL_PROP_PARQUET_MR_WRITE_ZONE);
+  }
+
+  /**
    * Sets avroSchema_ if the table or any of the partitions in the table are stored
    * as Avro. Additionally, this method also reconciles the schema if the column
    * definitions from the metastore differ from the Avro schema.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 9545828..2a9c84c 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -151,7 +151,6 @@ public class HdfsScanNode extends ScanNode {
 
   private static final Configuration CONF = new Configuration();
 
-
   // List of conjuncts for min/max values of parquet::Statistics, that are used to skip
   // data when scanning Parquet files.
   private List<Expr> minMaxConjuncts_ = Lists.newArrayList();
@@ -719,6 +718,10 @@ public class HdfsScanNode extends ScanNode {
     if (skipHeaderLineCount_ > 0) {
       msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_);
     }
+    String parquetMrWriteZone = tbl_.getParquetMrWriteZone();
+    if (parquetMrWriteZone != null) {
+      msg.hdfs_scan_node.setParquet_mr_write_zone(parquetMrWriteZone);
+    }
     msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
     if (!minMaxConjuncts_.isEmpty()) {
       for (Expr e: minMaxConjuncts_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 5a4a440..04de238 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -54,6 +54,9 @@ public class BackendConfig {
         !Strings.isNullOrEmpty(backendCfg_.principal);
   }
   public int getKuduClientTimeoutMs() { return backendCfg_.kudu_operation_timeout_ms; }
+  public boolean isSetParquetMrWriteZoneToUtcOnNewTables() {
+    return backendCfg_.set_parquet_mr_int96_write_zone_to_utc_on_new_tables;
+  }
 
   public int getImpalaLogLevel() { return backendCfg_.impala_log_lvl; }
   public int getNonImpalaJavaVlogLevel() { return backendCfg_.non_impala_java_vlog; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 8b87962..f712752 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -84,6 +84,10 @@ public class FeSupport {
   // using Java Thrift bindings.
   public native static byte[] NativePrioritizeLoad(byte[] thriftReq);
 
+  // Returns true if timezone String is valid according to the BE timezone database, false
+  // otherwise.
+  public native static boolean NativeCheckIsValidTimeZone(String timezone);
+
   /**
    * Locally caches the jar at the specified HDFS location.
    *
@@ -261,6 +265,16 @@ public class FeSupport {
     }
   }
 
+  public static boolean CheckIsValidTimeZone(String timezone) {
+    if (timezone == null) return false;
+    try {
+      return NativeCheckIsValidTimeZone(timezone);
+    } catch (UnsatisfiedLinkError e) {
+      loadLibrary();
+    }
+    return NativeCheckIsValidTimeZone(timezone);
+  }
+
   /**
    * This function should only be called explicitly by the FeSupport to ensure that
    * native functions are loaded.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 1278372..4a662f3 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -664,6 +664,58 @@ public class AnalyzeDDLTest extends FrontendTestBase {
   }
 
   @Test
+  public void TestParquetMrInt96WriteZone() {
+    // Attempt to set 'parquet.mr.int96.write.zone' when creating a table. Positive cases.
+    AnalyzesOk("create table tbl (i int) tblproperties " +
+        "('parquet.mr.int96.write.zone'='EST')");
+    AnalyzesOk("create table tbl tblproperties " +
+        "('parquet.mr.int96.write.zone'='EST') " +
+        "as select * from functional.alltypesnopart");
+    AnalyzesOk("create external table tbl like parquet " +
+        "'/test-warehouse/alltypesagg_hive_13_1_parquet/" +
+        "alltypesagg_hive_13_1.parquet' " +
+        "stored as parquet " +
+        "tblproperties ('parquet.mr.int96.write.zone'='EST')");
+    // Cannot set 'parquet.mr.int96.write.zone' table property when creating a non-HDFS
+    // table.
+    AnalysisError("create external table tbl stored as kudu tblproperties (" +
+        "'kudu.table_name'='tab'," +
+        "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081'," +
+        "'parquet.mr.int96.write.zone'='EST')",
+        "Table property 'parquet.mr.int96.write.zone' is only supported for HDFS " +
+        "tables.");
+    // Cannot set 'parquet.mr.int96.write.zone' table property to an invalid time zone
+    // when creating a table.
+    AnalysisError("create table tbl (i int) tblproperties" +
+        "('parquet.mr.int96.write.zone'='garbage')",
+        "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage");
+    AnalysisError("create table tbl tblproperties " +
+        "('parquet.mr.int96.write.zone'='garbage') " +
+        "as select * from functional.alltypesnopart",
+        "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage");
+    AnalysisError("create external table tbl like parquet " +
+        "'/test-warehouse/alltypesagg_hive_13_1_parquet/" +
+        "alltypesagg_hive_13_1.parquet' " +
+        "stored as parquet " +
+        "tblproperties ('parquet.mr.int96.write.zone'='garbage')",
+        "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage");
+
+    // Attempt to set 'parquet.mr.int96.write.zone' table property. Positive case.
+    AnalyzesOk("alter table functional.alltypes set tblproperties" +
+        "('parquet.mr.int96.write.zone'='EST')");
+    // Cannot set 'parquet.mr.int96.write.zone' table property on a table not backed by
+    // HDFS.
+    AnalysisError("alter table functional_kudu.alltypes set tblproperties" +
+        "('parquet.mr.int96.write.zone'='EST')",
+        "Table property 'parquet.mr.int96.write.zone' is only supported for HDFS " +
+        "tables.");
+    // Cannot set 'parquet.mr.int96.write.zone' table property to an invalid time zone.
+    AnalysisError("alter table functional.alltypes set tblproperties" +
+        "('parquet.mr.int96.write.zone'='garbage')",
+        "Invalid time zone in the 'parquet.mr.int96.write.zone' table property: garbage");
+  }
+
+  @Test
   public void TestAlterTableSetCached() {
     // Positive cases
     AnalyzesOk("alter table functional.alltypesnopart set cached in 'testPool'");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 2972cb8..af0ed1d 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -495,6 +495,29 @@ class ImpalaTestSuite(BaseTestSuite):
     assert not result.success, "No failure encountered for query %s" % query
     return result
 
+  def _get_properties(self, section_name, table_name):
+    """Extracts the table properties mapping from the output of DESCRIBE FORMATTED"""
+    result = self.client.execute("describe formatted " + table_name)
+    match = False
+    properties = dict();
+    for row in result.data:
+      if section_name in row:
+        match = True
+      elif match:
+        row = row.split('\t')
+        if (row[1] == 'NULL'):
+          break
+        properties[row[1].rstrip()] = row[2].rstrip()
+    return properties
+
+  def get_table_properties(self, table_name):
+    """Extracts the table properties mapping from the output of DESCRIBE FORMATTED"""
+    return self._get_properties('Table Parameters:', table_name)
+
+  def get_serde_properties(self, table_name):
+    """Extracts the serde properties mapping from the output of DESCRIBE FORMATTED"""
+    return self._get_properties('Storage Desc Params:', table_name)
+
   @execute_wrapper
   def execute_query(self, query, query_options=None):
     return self.__execute_query(self.client, query, query_options)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
index 4d7c202..fd631fd 100644
--- a/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
+++ b/tests/custom_cluster/test_hive_parquet_timestamp_conversion.py
@@ -21,12 +21,41 @@ import pytest
 import time
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.util.filesystem_utils import get_fs_path
 
 class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
-  '''Hive writes timestamps in parquet files by first converting values from local time
+  '''Hive writes timestamps in Parquet files by first converting values from local time
      to UTC. The conversion was not expected (other file formats don't convert) and a
-     startup flag was later added to adjust for this (IMPALA-1658). This file tests that
-     the conversion and flag behave as expected.
+     startup flag (-convert_legacy_hive_parquet_utc_timestamps) was later added to adjust
+     for this (IMPALA-1658). IMPALA-2716 solves the issue in a more general way by
+     introducing a table property ('parquet.mr.int96.write.zone') that specifies the time
+     zone to convert the timestamp values to.
+
+     This file tests that the table property and the startup option behave as expected in
+     the following scenarios:
+     1. If the 'parquet.mr.int96.write.zone' table property is set, Impala ignores the
+        -convert_legacy_hive_parquet_utc_timestamps startup option. It reads Parquet
+        timestamp data written by Hive and adjusts values using the time zone from the
+        table property.
+     2. If the 'parquet.mr.int96.write.zone' table property is not set, the
+        -convert_legacy_hive_parquet_utc_timestamps startup option is taken into account.
+        a. If the startup option is set to true, Impala reads Parquet timestamp data
+           created by Hive and adjusts values using the local time zone.
+        b. If the startup option is absent or set to false, no adjustment will be applied
+           to timestamp values.
+
+     IMPALA-2716 also introduces a startup option
+     (-set_parquet_mr_int96_write_zone_to_utc_on_new_tables) that determines if the table
+     property will be set on newly created tables. This file tests the basic behavior of the
+     startup option:
+     1. Tables created with the 'parquet.mr.int96.write.zone' table property explicitly
+        set, will keep the value the property is set to.
+     2. If -set_parquet_mr_int96_write_zone_to_utc_on_new_tables is set to true, tables
+        created using CREATE TABLE, CREATE TABLE AS SELECT and CREATE TABLE LIKE <FILE>
+        will set the table property to UTC.
+     3. Tables created using CREATE TABLE LIKE <OTHER TABLE> will ignore the value of
+        -set_parquet_mr_int96_write_zone_to_utc_on_new_tables and copy the property of
+        the table that is copied.
   '''
 
   @classmethod
@@ -36,11 +65,12 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
         v.get_value('table_format').file_format == 'parquet' and
         v.get_value('table_format').compression_codec == 'none')
 
-  def check_sanity(self, expect_converted_result):
+  def check_sanity(self, expect_converted_result,
+      tbl_name='functional_parquet.alltypesagg_hive_13_1'):
     data = self.execute_query_expect_success(self.client, """
         SELECT COUNT(timestamp_col), COUNT(DISTINCT timestamp_col),
                MIN(timestamp_col), MAX(timestamp_col)
-        FROM functional_parquet.alltypesagg_hive_13_1""")\
+        FROM {0}""".format(tbl_name))\
         .get_data()
     assert len(data) > 0
     rows = data.split("\n")
@@ -58,22 +88,63 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
       assert values[2] == "2010-01-01 00:00:00"
       assert values[3] == "2010-01-10 18:02:05.100000000"
 
+  def get_parquet_mr_write_zone_tbl_prop(self,
+      tbl_name='functional_parquet.alltypesagg_hive_13_1'):
+    tbl_prop = self.get_table_properties(tbl_name)
+    if 'parquet.mr.int96.write.zone' not in tbl_prop:
+      return None
+    return tbl_prop['parquet.mr.int96.write.zone']
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true")
-  def test_conversion(self, vector):
+  def test_conversion_to_tbl_prop_timezone(self, vector, unique_database):
+    # Create table with 'parquet.mr.int96.write.zone' property set to China Standard Time.
+    # The undelying parquet file has been written by Hive.
+    hive_tbl = '%s.hive_tbl' % unique_database
+    parquet_loc = get_fs_path('/test-warehouse/alltypesagg_hive_13_1_parquet')
+    parquet_path = get_fs_path(
+        '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet')
+    self.client.execute('''CREATE EXTERNAL TABLE {0} LIKE PARQUET "{1}"
+        STORED AS PARQUET LOCATION "{2}"
+        TBLPROPERTIES ('parquet.mr.int96.write.zone'='China Standard Time')
+        '''.format(hive_tbl, parquet_path, parquet_loc))
+    # Make sure that the table property has been properly set.
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=hive_tbl) ==\
+        'China Standard Time'
+    # Even though -convert_legacy_hive_parquet_utc_timestamps is set to true it is ignored
+    # because the 'parquet.mr.int06.write.zone' table property is also set. The value read
+    # from the Hive table should be the same as the corresponding Impala timestamp value
+    # converted from UTC to China Standard Time.
+    self.check_sanity(True, tbl_name=hive_tbl)
+    data = self.execute_query_expect_success(self.client, """
+        SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
+        FROM {0} h
+        JOIN functional_parquet.alltypesagg i
+          ON i.id = h.id AND i.day = h.day  -- serves as a unique key
+        WHERE
+          (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL)
+          OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, 'China Standard Time')
+        """.format(hive_tbl))\
+        .get_data()
+    assert len(data) == 0
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true")
+  def test_conversion_to_localtime(self, vector):
     tz_name = time.tzname[time.localtime().tm_isdst]
     self.check_sanity(tz_name not in ("UTC", "GMT"))
+    # Make sure that the table property is not set
+    assert self.get_parquet_mr_write_zone_tbl_prop() == None
     # The value read from the Hive table should be the same as reading a UTC converted
     # value from the Impala table.
     tz_name = time.tzname[time.localtime().tm_isdst]
     data = self.execute_query_expect_success(self.client, """
         SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
         FROM functional_parquet.alltypesagg_hive_13_1 h
-        JOIN functional_parquet.alltypesagg
-          i ON i.id = h.id AND i.day = h.day  -- serves as a unique key
+        JOIN functional_parquet.alltypesagg i
+          ON i.id = h.id AND i.day = h.day  -- serves as a unique key
         WHERE
-          (h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL)
-          OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL)
+          (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL)
           OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s')
         """ % tz_name)\
         .get_data()
@@ -83,13 +154,15 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=false")
   def test_no_conversion(self, vector):
     self.check_sanity(False)
+    # Make sure that the table property is not set
+    assert self.get_parquet_mr_write_zone_tbl_prop() == None
     # Without conversion all the values will be different.
     tz_name = time.tzname[time.localtime().tm_isdst]
     data = self.execute_query_expect_success(self.client, """
         SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
         FROM functional_parquet.alltypesagg_hive_13_1 h
-        JOIN functional_parquet.alltypesagg
-          i ON i.id = h.id AND i.day = h.day  -- serves as a unique key
+        JOIN functional_parquet.alltypesagg i
+          ON i.id = h.id AND i.day = h.day  -- serves as a unique key
         WHERE h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s')
         """ % tz_name)\
         .get_data()
@@ -101,11 +174,76 @@ class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
     data = self.execute_query_expect_success(self.client, """
         SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
         FROM functional_parquet.alltypesagg_hive_13_1 h
-        JOIN functional_parquet.alltypesagg
-          i ON i.id = h.id AND i.day = h.day  -- serves as a unique key
+        JOIN functional_parquet.alltypesagg i
+          ON i.id = h.id AND i.day = h.day  -- serves as a unique key
         WHERE
-          (h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL)
-          OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL)
+          (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL)
         """)\
         .get_data()
     assert len(data) == 0
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      "-set_parquet_mr_int96_write_zone_to_utc_on_new_tables=true")
+  def test_new_table_enable_set_tbl_prop_to_utc(self, unique_database):
+    # Table created with CREATE TABLE will set the table property to UTC.
+    tbl1_name = '%s.table1' % unique_database
+    self.client.execute('CREATE TABLE {0} (id int)'.format(tbl1_name))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl1_name) == 'UTC'
+    # Table created with CREATE TABLE will honor the explicitly set property.
+    tbl_est_name = '%s.table_est' % unique_database
+    self.client.execute('''CREATE TABLE {0} (id int)
+        TBLPROPERTIES ('parquet.mr.int96.write.zone'='EST')
+        '''.format(tbl_est_name))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl_est_name) == 'EST'
+    # Table created with CREATE TABLE AS SELECT will set the table property to UTC. Table
+    # property is not copied from the other table.
+    tbl2_name = '%s.table2' % unique_database
+    self.client.execute('CREATE TABLE {0} AS SELECT * FROM {1}'.format(
+        tbl2_name, tbl_est_name))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl2_name) == 'UTC'
+    # Table created with CREATE TABLE LIKE <FILE> will set the table property to UTC.
+    tbl3_name = '%s.tbl3_name' % unique_database
+    parquet_path = get_fs_path(
+        '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet')
+    self.client.execute('CREATE EXTERNAL TABLE {0} LIKE PARQUET "{1}"'.format(
+        tbl3_name, parquet_path))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl3_name) == 'UTC'
+    # Table created with CREATE TABLE LIKE <OTHER TABLE> will copy the property from the
+    # other table.
+    tbl4_name = '%s.tbl4_name' % unique_database
+    self.client.execute('CREATE TABLE {0} LIKE {1}'.format(tbl4_name, tbl_est_name));
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl4_name) == 'EST'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      "-set_parquet_mr_int96_write_zone_to_utc_on_new_tables=false")
+  def test_new_table_disable_set_tbl_prop_to_utc(self, unique_database):
+    # Table created with CREATE TABLE will not set the table property.
+    tbl1_name = '%s.table1' % unique_database
+    self.client.execute('CREATE TABLE {0} (id int)'.format(tbl1_name))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl1_name) == None
+    # Table created with CREATE TABLE will honor the explicitly set property.
+    tbl_est_name = '%s.table_est' % unique_database
+    self.client.execute('''CREATE TABLE {0} (id int)
+        TBLPROPERTIES ('parquet.mr.int96.write.zone'='EST')
+        '''.format(tbl_est_name))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl_est_name) == 'EST'
+    # Table created with CREATE TABLE AS SELECT will not set the table property. Table
+    # property is not copied from the other table.
+    tbl2_name = '%s.table2' % unique_database
+    self.client.execute('CREATE TABLE {0} AS SELECT * FROM {1}'.format(
+        tbl2_name, tbl_est_name))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl2_name) == None
+    # Table created with CREATE TABLE LIKE <FILE> will not set the table property.
+    tbl3_name = '%s.tbl3_name' % unique_database
+    parquet_path = get_fs_path(
+        '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet')
+    self.client.execute('CREATE EXTERNAL TABLE {0} LIKE PARQUET "{1}"'.format(
+        tbl3_name, parquet_path))
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl3_name) == None
+    # Table created with CREATE TABLE LIKE <OTHER TABLE> will copy the property from the
+    # other table.
+    tbl4_name = '%s.tbl4_name' % unique_database
+    self.client.execute('CREATE TABLE {0} LIKE {1}'.format(tbl4_name, tbl_est_name));
+    assert self.get_parquet_mr_write_zone_tbl_prop(tbl_name=tbl4_name) == 'EST'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 37ca7cd..2dc950b 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -368,7 +368,7 @@ class TestDdlStatements(TestDdlBase):
     self.client.execute("""create table {0} (i int)
     with serdeproperties ('s1'='s2', 's3'='s4')
     tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name))
-    properties = self._get_tbl_properties(fq_tbl_name)
+    properties = self.get_table_properties(fq_tbl_name)
 
     assert len(properties) == 2
     # The transient_lastDdlTime is variable, so don't verify the value.
@@ -376,19 +376,19 @@ class TestDdlStatements(TestDdlBase):
     del properties['transient_lastDdlTime']
     assert {'p1': 'v1'} == properties
 
-    properties = self._get_serde_properties(fq_tbl_name)
+    properties = self.get_serde_properties(fq_tbl_name)
     assert {'s1': 's2', 's3': 's4'} == properties
 
     # Modify the SERDEPROPERTIES using ALTER TABLE SET.
     self.client.execute("alter table {0} set serdeproperties "
         "('s1'='new', 's5'='s6')".format(fq_tbl_name))
-    properties = self._get_serde_properties(fq_tbl_name)
+    properties = self.get_serde_properties(fq_tbl_name)
     assert {'s1': 'new', 's3': 's4', 's5': 's6'} == properties
 
     # Modify the TBLPROPERTIES using ALTER TABLE SET.
     self.client.execute("alter table {0} set tblproperties "
         "('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name))
-    properties = self._get_tbl_properties(fq_tbl_name)
+    properties = self.get_table_properties(fq_tbl_name)
 
     assert 'transient_lastDdlTime' in properties
     assert properties['p1'] == 'v1'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/tests/metadata/test_ddl_base.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl_base.py b/tests/metadata/test_ddl_base.py
index 3044ef0..acda99f 100644
--- a/tests/metadata/test_ddl_base.py
+++ b/tests/metadata/test_ddl_base.py
@@ -64,26 +64,3 @@ class TestDdlBase(ImpalaTestSuite):
         db_name, comment, WAREHOUSE)
     impala_client.execute(ddl)
     impala_client.close()
-
-  def _get_tbl_properties(self, table_name):
-    """Extracts the table properties mapping from the output of DESCRIBE FORMATTED"""
-    return self._get_properties('Table Parameters:', table_name)
-
-  def _get_serde_properties(self, table_name):
-    """Extracts the serde properties mapping from the output of DESCRIBE FORMATTED"""
-    return self._get_properties('Storage Desc Params:', table_name)
-
-  def _get_properties(self, section_name, table_name):
-    """Extracts the table properties mapping from the output of DESCRIBE FORMATTED"""
-    result = self.client.execute("describe formatted " + table_name)
-    match = False
-    properties = dict();
-    for row in result.data:
-      if section_name in row:
-        match = True
-      elif match:
-        row = row.split('\t')
-        if (row[1] == 'NULL'):
-          break
-        properties[row[1].rstrip()] = row[2].rstrip()
-    return properties

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5803a0b0/tests/query_test/test_parquet_timestamp_compatibility.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_timestamp_compatibility.py b/tests/query_test/test_parquet_timestamp_compatibility.py
new file mode 100644
index 0000000..37e6398
--- /dev/null
+++ b/tests/query_test/test_parquet_timestamp_compatibility.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import pytest
+import time
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+
+class TestParquetTimestampCompatibility(ImpalaTestSuite):
+  '''Hive adjusts timestamps by subtracting the local time zone's offset from all values
+     when writing data to Parquet files. As a result of this adjustment, Impala may read
+     "incorrect" timestamp values from Parquet files written by Hive. To fix the problem
+     a table property ('parquet.mr.int96.write.zone') was introduced in IMPALA-2716 that
+     specifies the time zone to convert the timesamp values to.
+
+     This file tests the following scenarios:
+     1. If the 'parquet.mr.int96.write.zone' table property is set to an invalid time zone
+        (by Hive), Impala throws an error when analyzing a query against the table.
+     2. If the 'parquet.mr.int96.write.zone' table property is set to a valid time zone:
+        a. Impala adjusts timestamp values read from Parquet files created by Hive using
+           the time zone from the table property.
+        b. Impala does not adjust timestamp values read from Parquet files created by
+           Impala.
+  '''
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestParquetTimestampCompatibility, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  def _setup_env(self, hive_tbl_name, impala_tbl_name=None):
+    parquet_loc = get_fs_path('/test-warehouse/alltypesagg_hive_13_1_parquet')
+    parquet_fn = get_fs_path(
+        '/test-warehouse/alltypesagg_hive_13_1_parquet/alltypesagg_hive_13_1.parquet')
+    self.client.execute('''CREATE EXTERNAL TABLE {0}
+        LIKE PARQUET "{1}"
+        STORED AS PARQUET LOCATION "{2}"
+        '''.format(hive_tbl_name, parquet_fn, parquet_loc))
+    if impala_tbl_name:
+      self.client.execute('''CREATE TABLE {0}
+          STORED AS PARQUET AS
+          SELECT * FROM {1}
+          '''.format(impala_tbl_name, hive_tbl_name))
+
+  def _set_tbl_timezone(self, tbl_name, tz_name):
+    self.client.execute('''ALTER TABLE {0}
+        SET TBLPROPERTIES ('parquet.mr.int96.write.zone'='{1}')
+        '''.format(tbl_name, tz_name))
+
+  def _get_parquet_mr_write_zone_tbl_prop(self, tbl_name):
+    tbl_prop = self.get_table_properties(tbl_name)
+    if 'parquet.mr.int96.write.zone' not in tbl_prop:
+      return None
+    return tbl_prop['parquet.mr.int96.write.zone']
+
+  def test_invalid_parquet_mr_write_zone(self, vector, unique_database):
+    # Hive doesn't allow setting 'parquet.mr.int96.write.zone' table property to an
+    # invalid time zone anymore.
+    pytest.skip()
+
+    hive_tbl_name = '%s.hive_table' % unique_database
+    self._setup_env(hive_tbl_name)
+    # Hive sets the table property to an invalid time zone
+    self.run_stmt_in_hive('''ALTER TABLE {0}
+        SET TBLPROPERTIES ('parquet.mr.int96.write.zone'='garbage')
+        '''.format(hive_tbl_name))
+    self.client.execute('REFRESH %s' % hive_tbl_name)
+    # Impala throws an error when the table is queried
+    try:
+      self.client.execute('SELECT timestamp_col FROM %s' % hive_tbl_name)
+    except ImpalaBeeswaxException, e:
+      if "Invalid time zone" not in str(e):
+        raise e
+    else:
+      assert False, "Query was expected to fail"
+
+  def test_parquet_timestamp_conversion(self, vector, unique_database):
+    hive_tbl_name = '%s.hive_table' % unique_database
+    impala_tbl_name = '%s.impala_table' % unique_database
+    self._setup_env(hive_tbl_name, impala_tbl_name)
+    for tz_name in ['UTC', 'EST', 'China Standard Time', 'CET']:
+      # impala_table's underlying Parquet file was written by Impala. No conversion is
+      # performed on the timestamp values, no matter what value
+      # 'parquet.mr.int96.write.zone' is set to.
+      self._set_tbl_timezone(impala_tbl_name, tz_name)
+      data = self.execute_query_expect_success(self.client, """
+          SELECT i2.id, i2.day, i2.timestamp_col, i1.timestamp_col
+          FROM functional.alltypesagg i1
+          JOIN {0} i2
+            ON i1.id = i2.id AND i1.day = i2.day  -- serves as a unique key
+          WHERE
+            (i1.timestamp_col IS NULL) != (i2.timestamp_col IS NULL)
+            OR i1.timestamp_col != i2.timestamp_col
+          """.format(impala_tbl_name))\
+          .get_data()
+      assert len(data) == 0
+      assert self._get_parquet_mr_write_zone_tbl_prop(impala_tbl_name) == tz_name
+      # hive_table's underlying Parquet file was written by Hive. Setting the
+      # 'parquet.mr.int96.write.zone' table property to tz_name triggers a 'UTC' ->
+      # tz_name conversion on the timestamp values.
+      self._set_tbl_timezone(hive_tbl_name, tz_name)
+      data = self.execute_query_expect_success(self.client, """
+          SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
+          FROM functional.alltypesagg i
+          JOIN {0} h
+            ON i.id = h.id AND i.day = h.day  -- serves as a unique key
+          WHERE
+            (h.timestamp_col IS NULL) != (i.timestamp_col IS NULL)
+            OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '{1}')
+          """.format(hive_tbl_name, tz_name))\
+          .get_data()
+      assert len(data) == 0
+      assert self._get_parquet_mr_write_zone_tbl_prop(hive_tbl_name) == tz_name


Mime
View raw message