impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mi...@apache.org
Subject [5/5] incubator-impala git commit: IMPALA-4734: Set parquet::RowGroup::sorting_columns
Date Tue, 07 Mar 2017 15:08:13 GMT
IMPALA-4734: Set parquet::RowGroup::sorting_columns

This changes the HdfsParquetTableWriter to populate the
parquet::RowGroup::sorting_columns list with all columns mentioned in a
'sortby()' hint within INSERT statements. The columns are added to the
list in the order in which they appear inside the hint.

The change also adds backports.tempfile to the python requirements to
provide 'tempfile.TemporaryDirectory' on python 2.7.

The change also changes the default ordering for columns mentioned in
'sortby()' hints from descending to ascending.

To test this change, we write a table with a 'sortby()' hint and verify,
that the sorting_columns get populated correctly.

Change-Id: Ib42aab585e9e627796e9510e783652d49d74b56c
Reviewed-on: http://gerrit.cloudera.org:8080/6219
Reviewed-by: Lars Volker <lv@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/768fc0ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/768fc0ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/768fc0ea

Branch: refs/heads/master
Commit: 768fc0ea2773289b88256ea16090c0cfcf2d0a97
Parents: 5d306ef
Author: Lars Volker <lv@cloudera.com>
Authored: Mon Feb 27 11:21:40 2017 -0800
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Tue Mar 7 09:07:05 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc        | 16 +++++++--
 be/src/exec/hdfs-parquet-table-writer.h         |  2 +-
 be/src/exec/hdfs-table-sink.cc                  |  1 +
 be/src/exec/hdfs-table-sink.h                   |  7 +++-
 common/thrift/DataSinks.thrift                  |  5 +++
 .../org/apache/impala/analysis/DeleteStmt.java  |  3 +-
 .../org/apache/impala/analysis/InsertStmt.java  |  9 ++++-
 .../org/apache/impala/analysis/UpdateStmt.java  |  3 +-
 .../apache/impala/planner/HdfsTableSink.java    | 10 +++++-
 .../java/org/apache/impala/planner/Planner.java |  2 +-
 .../org/apache/impala/planner/TableSink.java    | 14 ++++++--
 infra/python/deps/requirements.txt              |  1 +
 .../queries/PlannerTest/insert.test             | 24 ++++++-------
 .../queries/PlannerTest/kudu-upsert.test        |  2 +-
 .../queries/PlannerTest/kudu.test               |  8 ++---
 tests/query_test/test_insert_parquet.py         | 38 ++++++++++++++++++++
 16 files changed, 117 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 164e921..5c2d24c 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -998,7 +998,7 @@ Status HdfsParquetTableWriter::Finalize() {
   file_metadata_.num_rows = row_count_;
   RETURN_IF_ERROR(FlushCurrentRowGroup());
   RETURN_IF_ERROR(WriteFileFooter());
-  stats_.__set_parquet_stats(parquet_stats_);
+  stats_.__set_parquet_stats(parquet_insert_stats_);
   COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
   return Status::OK();
 }
@@ -1046,7 +1046,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     current_row_group_->num_rows = col_writer->num_values();
     current_row_group_->columns[i].file_offset = file_pos_;
     const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    parquet_stats_.per_column_size[col_name] += col_writer->total_compressed_size();
+    parquet_insert_stats_.per_column_size[col_name] +=
+        col_writer->total_compressed_size();
 
     // Write encodings and encoding stats for this column
     col_metadata.encodings.clear();
@@ -1094,6 +1095,17 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     col_writer->Reset();
   }
 
+  // Populate RowGroup::sorting_columns with all columns specified by the Frontend.
+  for (int col_idx : parent_->sort_by_columns()) {
+    current_row_group_->sorting_columns.push_back(parquet::SortingColumn());
+    parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back();
+    sorting_column.column_idx = col_idx;
+    sorting_column.descending = false;
+    sorting_column.nulls_first = false;
+  }
+  current_row_group_->__isset.sorting_columns =
+      !current_row_group_->sorting_columns.empty();
+
   current_row_group_ = nullptr;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h
index d4fbd94..8cbd456 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -187,7 +187,7 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   std::vector<uint8_t> compression_staging_buffer_;
 
   /// For each column, the on disk size written.
-  TParquetInsertStats parquet_stats_;
+  TParquetInsertStats parquet_insert_stats_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 8d64683..4aa09cd 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -69,6 +69,7 @@ HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc,
     partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs),
     overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
     input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered),
+    sort_by_columns_(tsink.table_sink .hdfs_table_sink.sort_by_columns),
     current_clustered_partition_(nullptr) {
   DCHECK(tsink.__isset.table_sink);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 65c1798..45d64c5 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -152,7 +152,7 @@ class HdfsTableSink : public DataSink {
   virtual void Close(RuntimeState* state);
 
   int skip_header_line_count() const { return skip_header_line_count_; }
-
+  const vector<int32_t>& sort_by_columns() const { return sort_by_columns_; }
   const HdfsTableDescriptor& TableDesc() { return *table_desc_; }
 
   RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
@@ -270,6 +270,11 @@ class HdfsTableSink : public DataSink {
   /// be opened, written, and closed one by one.
   bool input_is_clustered_;
 
+  // Stores the indices into the list of non-clustering columns of the target table that
+  // are mentioned in the 'sortby()' hint. This is used in the backend to populate the
+  // RowGroup::sorting_columns list in parquet files.
+  const std::vector<int32_t>& sort_by_columns_;
+
   /// Stores the current partition during clustered inserts across subsequent row batches.
   /// Only set if 'input_is_clustered_' is true.
   PartitionPair* current_clustered_partition_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 0b136b2..6e3224e 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -70,6 +70,11 @@ struct THdfsTableSink {
   // This property indicates to the table sink whether the input is ordered by the
   // partition keys, meaning partitions can be opened, written, and closed one by one.
   4: required bool input_is_clustered
+
+  // Stores the indices into the list of non-clustering columns of the target table that
+  // are mentioned in the 'sortby()' hint. This is used in the backend to populate the
+  // RowGroup::sorting_columns list in parquet files.
+  5: optional list<i32> sort_by_columns
 }
 
 // Structure to encapsulate specific options that are passed down to the KuduTableSink

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index 2f7f670..52d58a7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -56,7 +56,8 @@ public class DeleteStmt extends ModifyStmt {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
     TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
-        ImmutableList.<Expr>of(), referencedColumns_, false, false);
+        ImmutableList.<Expr>of(), referencedColumns_, false, false,
+        ImmutableList.<Integer>of());
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return tableSink;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 916f97f..b7172da 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -130,6 +130,11 @@ public class InsertStmt extends StatementBase {
   // contain primary key columns.
   private List<Expr> sortByExprs_ = Lists.newArrayList();
 
+  // Stores the indices into the list of non-clustering columns of the target table that
+  // are mentioned in the 'sortby()' hint. This is sent to the backend to populate the
+  // RowGroup::sorting_columns list in parquet files.
+  private List<Integer> sortByColumns_ = Lists.newArrayList();
+
   // Output expressions that produce the final results to write to the target table. May
   // include casts. Set in prepareExpressions().
   // If this is an INSERT on a non-Kudu table, it will contain one Expr for all
@@ -804,6 +809,7 @@ public class InsertStmt extends StatementBase {
       for (int i = 0; i < columns.size(); ++i) {
         if (columns.get(i).getName().equals(columnName)) {
           sortByExprs_.add(resultExprs_.get(i));
+          sortByColumns_.add(i);
           foundColumn = true;
           break;
         }
@@ -854,7 +860,8 @@ public class InsertStmt extends StatementBase {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
     return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
-        partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_);
+        partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_,
+        sortByColumns_);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index de74bd8..ddce618 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -65,7 +65,8 @@ public class UpdateStmt extends ModifyStmt {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
     DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE,
-        ImmutableList.<Expr>of(), referencedColumns_, false, false);
+        ImmutableList.<Expr>of(), referencedColumns_, false, false,
+        ImmutableList.<Integer>of());
     Preconditions.checkState(!referencedColumns_.isEmpty());
     return dataSink;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index fc7f9b1..996f981 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -32,6 +32,7 @@ import org.apache.impala.thrift.THdfsTableSink;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * Base class for Hdfs data sinks such as HdfsTextTableSink.
@@ -50,13 +51,19 @@ public class HdfsTableSink extends TableSink {
   // be opened, written, and closed one by one.
   protected final boolean inputIsClustered_;
 
+  // Stores the indices into the list of non-clustering columns of the target table that
+  // are mentioned in the 'sortby()' hint. This is sent to the backend to populate the
+  // RowGroup::sorting_columns list in parquet files.
+  private List<Integer> sortByColumns_ = Lists.newArrayList();
+
   public HdfsTableSink(Table targetTable, List<Expr> partitionKeyExprs,
-      boolean overwrite, boolean inputIsClustered) {
+      boolean overwrite, boolean inputIsClustered, List<Integer> sortByColumns) {
     super(targetTable, Op.INSERT);
     Preconditions.checkState(targetTable instanceof HdfsTable);
     partitionKeyExprs_ = partitionKeyExprs;
     overwrite_ = overwrite;
     inputIsClustered_ = inputIsClustered;
+    sortByColumns_ = sortByColumns;
   }
 
   @Override
@@ -154,6 +161,7 @@ public class HdfsTableSink extends TableSink {
     if (skipHeaderLineCount > 0) {
       hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount);
     }
+    hdfsTableSink.setSort_by_columns(sortByColumns_);
     TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
         TTableSinkType.HDFS, sinkOp_.toThrift());
     tTableSink.hdfs_table_sink = hdfsTableSink;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index cc8b39b..8842c9c 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -548,7 +548,7 @@ public class Planner {
     if (orderingExprs.isEmpty()) return;
 
     // Build sortinfo to sort by the ordering exprs.
-    List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), false);
+    List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true);
     List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false);
     SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/planner/TableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index fb3cea2..8595eea 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -87,16 +87,22 @@ public abstract class TableSink extends DataSink {
    * Not all Ops are supported for all tables.
    * All parameters must be non-null, the lists in particular need to be empty if they
    * don't make sense for a certain table type.
+   * For HDFS tables 'sortByColumns' specifies the indices into the list of non-clustering
+   * columns of the target table that are mentioned in the 'sortby()' hint.
    */
   public static TableSink create(Table table, Op sinkAction,
       List<Expr> partitionKeyExprs,  List<Integer> referencedColumns,
-      boolean overwrite, boolean inputIsClustered) {
+      boolean overwrite, boolean inputIsClustered, List<Integer> sortByColumns) {
+    Preconditions.checkNotNull(partitionKeyExprs);
+    Preconditions.checkNotNull(referencedColumns);
+    Preconditions.checkNotNull(sortByColumns);
     if (table instanceof HdfsTable) {
       // Hdfs only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
       // Referenced columns don't make sense for an Hdfs table.
       Preconditions.checkState(referencedColumns.isEmpty());
-      return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered);
+      return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered,
+          sortByColumns);
     } else if (table instanceof HBaseTable) {
       // HBase only supports inserts.
       Preconditions.checkState(sinkAction == Op.INSERT);
@@ -106,6 +112,8 @@ public abstract class TableSink extends DataSink {
       Preconditions.checkState(overwrite == false);
       // Referenced columns don't make sense for an HBase table.
       Preconditions.checkState(referencedColumns.isEmpty());
+      // sortby() hint is not supported for HBase tables.
+      Preconditions.checkState(sortByColumns.isEmpty());
       // Create the HBaseTableSink and return it.
       return new HBaseTableSink(table);
     } else if (table instanceof KuduTable) {
@@ -113,6 +121,8 @@ public abstract class TableSink extends DataSink {
       Preconditions.checkState(overwrite == false);
       // Partition clauses don't make sense for Kudu inserts.
       Preconditions.checkState(partitionKeyExprs.isEmpty());
+      // sortby() hint is not supported for Kudu tables.
+      Preconditions.checkState(sortByColumns.isEmpty());
       return new KuduTableSink(table, sinkAction, referencedColumns);
     } else {
       throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/infra/python/deps/requirements.txt
----------------------------------------------------------------------
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index 7d9d484..c068c83 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -23,6 +23,7 @@
 # multiple times (though maybe they could be).
 
 allpairs == 2.0.1
+backports.tempfile == 1.0rc1
 boto3 == 1.2.3
   simplejson == 3.3.0 # For python version 2.6
   botocore == 1.3.30

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index 8e3adb9..5f54f86 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -578,7 +578,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 01:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -587,7 +587,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 02:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
 |
 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
 |
@@ -602,7 +602,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 01:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -611,7 +611,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 01:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -626,7 +626,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 04:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
 |
 03:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: int_col = max(int_col)
@@ -646,7 +646,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 08:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST
 |
 07:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
 |
@@ -714,7 +714,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 01:SORT
-|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|  order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -723,7 +723,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 02:SORT
-|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|  order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST
 |
 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
 |
@@ -739,7 +739,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 01:SORT
-|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|  order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -748,7 +748,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 01:SORT
-|  order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST
+|  order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -762,7 +762,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 01:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col
DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST, int_col ASC NULLS LAST, bool_col
ASC NULLS LAST
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -771,7 +771,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month)
 |  partitions=24
 |
 02:SORT
-|  order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col
DESC NULLS LAST
+|  order by: year ASC NULLS LAST, month ASC NULLS LAST, int_col ASC NULLS LAST, bool_col
ASC NULLS LAST
 |
 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
index b106b02..bbcb014 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test
@@ -86,7 +86,7 @@ select * from functional_kudu.testtbl
 UPSERT INTO KUDU [functional_kudu.testtbl]
 |
 01:SORT
-|  order by: id DESC NULLS LAST
+|  order by: id ASC NULLS LAST
 |
 00:SCAN KUDU [functional_kudu.testtbl]
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index 1549ec7..6b5291a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -225,14 +225,14 @@ select * from functional_kudu.alltypes
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
 01:SORT
-|  order by: id DESC NULLS LAST
+|  order by: id ASC NULLS LAST
 |
 00:SCAN KUDU [functional_kudu.alltypes]
 ---- DISTRIBUTEDPLAN
 INSERT INTO KUDU [functional_kudu.alltypes]
 |
 01:SORT
-|  order by: id DESC NULLS LAST
+|  order by: id ASC NULLS LAST
 |
 00:SCAN KUDU [functional_kudu.alltypes]
 ====
@@ -247,7 +247,7 @@ from functional_kudu.testtbl group by id, name
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
 02:SORT
-|  order by: id DESC NULLS LAST
+|  order by: id ASC NULLS LAST
 |
 01:AGGREGATE [FINALIZE]
 |  output: max(zip)
@@ -258,7 +258,7 @@ INSERT INTO KUDU [functional_kudu.testtbl]
 INSERT INTO KUDU [functional_kudu.testtbl]
 |
 04:SORT
-|  order by: id DESC NULLS LAST
+|  order by: id ASC NULLS LAST
 |
 03:AGGREGATE [FINALIZE]
 |  output: max:merge(zip)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index be2704d..cca5827 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -23,6 +23,8 @@ from collections import namedtuple
 from shutil import rmtree
 from subprocess import check_call
 from tempfile import mkdtemp as make_tmp_dir
+from backports.tempfile import TemporaryDirectory
+from parquet.ttypes import SortingColumn
 
 from tests.common.environ import impalad_basedir
 from tests.common.impala_test_suite import ImpalaTestSuite
@@ -213,6 +215,42 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
       self.execute_query("drop table %s" % qualified_table_name)
       rmtree(tmp_dir)
 
+  def test_sorting_columns(self, vector, unique_database):
+    """Tests that RowGroup::sorting_columns gets populated when specifying a sortby()
+    insert hint."""
+    source_table = "functional_parquet.alltypessmall"
+    target_table = "test_write_sorting_columns"
+    qualified_target_table = "{0}.{1}".format(unique_database, target_table)
+    hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database,
+        target_table))
+
+    # Create table
+    # TODO: Simplify once IMPALA-4167 (insert hints in CTAS) has been fixed.
+    query = "create table {0} like {1} stored as parquet".format(qualified_target_table,
+        source_table)
+    self.execute_query(query)
+
+    # Insert data
+    query = ("insert into {0} partition(year, month) /* +sortby(int_col, id) */ "
+        "select * from {1}").format(qualified_target_table, source_table)
+    self.execute_query(query)
+
+    # Download hdfs files and extract rowgroup metadata
+    row_groups = []
+    with TemporaryDirectory() as tmp_dir:
+      check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
+
+      for root, subdirs, files in os.walk(tmp_dir):
+        for f in files:
+          parquet_file = os.path.join(root, str(f))
+          file_meta_data = get_parquet_metadata(parquet_file)
+          row_groups.extend(file_meta_data.row_groups)
+
+    # Verify that the files have the sorted_columns set
+    expected = [SortingColumn(4, False, False), SortingColumn(0, False, False)]
+    for row_group in row_groups:
+      assert row_group.sorting_columns == expected
+
 
 @SkipIfIsilon.hive
 @SkipIfLocal.hive


Mime
View raw message