impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [2/3] incubator-impala git commit: IMPALA-3346: DeepCopy() Kudu rows into Impala tuples.
Date Sun, 30 Oct 2016 21:10:15 GMT
IMPALA-3346: DeepCopy() Kudu rows into Impala tuples.

Implements additional changes to make the memory layout
of Kudu rows identical to Impala tuples.
In particular, Kudu rows allocate a null bit even for
non-nullable columns, and Impala now does the same
for Kudu scan tuples.

This change exploits the now-identical Kudu and Impala
tuple layouts to avoid the expensive translation.

Perf: Mostafa reported a 50% efficiency gain on full
table scans.

Testing: A private core/hdfs run passed.

TODO:
1) Test cases with nullable/nonnullable non-PK slots.
2) Specify mem layout to client (depends on KUDU-1694)
3) Avoid mem copies (depends on KUDU-1695)

Change-Id: Ic911e4eff9fe98bf28d8a1bab5c9d7e9ab66d9cb
Reviewed-on: http://gerrit.cloudera.org:8080/4862
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Marcel Kornacker <marcel@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9f3f4b71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9f3f4b71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9f3f4b71

Branch: refs/heads/master
Commit: 9f3f4b713d78e8dcf02c02def447195a04f408e6
Parents: 12e34b4
Author: Alex Behm <alex.behm@cloudera.com>
Authored: Mon Oct 24 20:46:34 2016 -0700
Committer: Marcel Kornacker <marcel@cloudera.com>
Committed: Sun Oct 30 19:36:10 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc                     | 183 +++----------------
 be/src/exec/kudu-scanner.h                      |  28 ---
 .../apache/impala/analysis/SlotDescriptor.java  |   4 +-
 .../apache/impala/analysis/TupleDescriptor.java |  41 +++--
 .../org/apache/impala/planner/KuduScanNode.java |  13 +-
 5 files changed, 67 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index ca4ee9a..d230985 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -58,19 +58,10 @@ KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
     state_(state),
     cur_kudu_batch_num_read_(0),
-    last_alive_time_micros_(0),
-    num_string_slots_(0) {
+    last_alive_time_micros_(0) {
 }
 
 Status KuduScanner::Open() {
-  // Store columns that need relocation when materialized into the
-  // destination row batch.
-  for (int i = 0; i < scan_node_->tuple_desc_->slots().size(); ++i) {
-    if (scan_node_->tuple_desc_->slots()[i]->type().IsStringType()) {
-      string_slots_.push_back(scan_node_->tuple_desc_->slots()[i]);
-      ++num_string_slots_;
-    }
-  }
   return scan_node_->GetConjunctCtxs(&conjunct_ctxs_);
 }
 
@@ -109,7 +100,7 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
     RETURN_IF_CANCELLED(state_);
 
     if (cur_kudu_batch_num_read_ < cur_kudu_batch_.NumRows()) {
-      bool batch_done = false;
+      bool batch_done;
       RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple, &batch_done));
       if (batch_done) break;
     }
@@ -172,54 +163,43 @@ Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch, bool*
batch_done)
   return Status::OK();
 }
 
-Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch,
-    Tuple** tuple_mem, bool* batch_done) {
+Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem,
+    bool* batch_done) {
+  *batch_done = false;
 
   // Short-circuit the count(*) case.
   if (scan_node_->tuple_desc_->slots().empty()) {
     return HandleEmptyProjection(row_batch, batch_done);
   }
 
-  // TODO consider consolidating the tuple creation/initialization here with the version
-  // that happens inside the loop.
-  int idx = row_batch->AddRow();
-  TupleRow* row = row_batch->GetRow(idx);
-  (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc());
-  row->SetTuple(tuple_idx(), *tuple_mem);
-
+  // Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors into
+  // 'row_batch'.
+  bool has_conjuncts = !conjunct_ctxs_.empty();
   int num_rows = cur_kudu_batch_.NumRows();
-  // Now iterate through the Kudu rows.
   for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; ++krow_idx) {
-    // Clear any NULL indicators set by a previous iteration.
-    (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc());
-
-    // Transform a Kudu row into an Impala row.
+    // Evaluate the conjuncts that haven't been pushed down to Kudu. Conjunct evaluation
+    // is performed directly on the Kudu tuple because its memory layout is identical to
+    // Impala's. We only copy the surviving tuples to Impala's output row batch.
     KuduScanBatch::RowPtr krow = cur_kudu_batch_.Row(krow_idx);
-    RETURN_IF_ERROR(KuduRowToImpalaTuple(krow, row_batch, *tuple_mem));
+    Tuple* kudu_tuple = reinterpret_cast<Tuple*>(const_cast<void*>(krow.cell(0)));
     ++cur_kudu_batch_num_read_;
-
-    // Evaluate the conjuncts that haven't been pushed down to Kudu.
-    if (conjunct_ctxs_.empty() ||
-        ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) {
-      // Materialize those slots that require auxiliary memory
-      RETURN_IF_ERROR(RelocateValuesFromKudu(*tuple_mem, row_batch->tuple_data_pool()));
-      // If the conjuncts pass on the row commit it.
-      row_batch->CommitLastRow();
-      // If we've reached the capacity, or the LIMIT for the scan, return.
-      if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) {
-        *batch_done = true;
-        break;
-      }
-      // Add another row.
-      idx = row_batch->AddRow();
-
-      // Move to the next tuple in the tuple buffer.
-      *tuple_mem = next_tuple(*tuple_mem);
-      (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc());
-      // Make 'row' point to the new row.
-      row = row_batch->GetRow(idx);
-      row->SetTuple(tuple_idx(), *tuple_mem);
+    if (has_conjuncts && !ExecNode::EvalConjuncts(&conjunct_ctxs_[0],
+        conjunct_ctxs_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) {
+      continue;
+    }
+    // Deep copy the tuple, set it in a new row, and commit the row.
+    kudu_tuple->DeepCopy(*tuple_mem, *scan_node_->tuple_desc(),
+        row_batch->tuple_data_pool());
+    TupleRow* row = row_batch->GetRow(row_batch->AddRow());
+    row->SetTuple(0, *tuple_mem);
+    row_batch->CommitLastRow();
+    // If we've reached the capacity, or the LIMIT for the scan, return.
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) {
+      *batch_done = true;
+      break;
     }
+    // Move to the next tuple in the tuple buffer.
+    *tuple_mem = next_tuple(*tuple_mem);
   }
   ExprContext::FreeLocalAllocations(conjunct_ctxs_);
 
@@ -227,113 +207,6 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch,
   return state_->GetQueryStatus();
 }
 
-void KuduScanner::SetSlotToNull(Tuple* tuple, const SlotDescriptor& slot) {
-  DCHECK(slot.is_nullable());
-  tuple->SetNull(slot.null_indicator_offset());
-}
-
-bool KuduScanner::IsSlotNull(Tuple* tuple, const SlotDescriptor& slot) {
-  return slot.is_nullable() && tuple->IsNull(slot.null_indicator_offset());
-}
-
-Status KuduScanner::RelocateValuesFromKudu(Tuple* tuple, MemPool* mem_pool) {
-  for (int i = 0; i < num_string_slots_; ++i) {
-    const SlotDescriptor* slot = string_slots_[i];
-    // NULL handling was done in KuduRowToImpalaTuple.
-    if (IsSlotNull(tuple, *slot)) continue;
-
-    // Extract the string value.
-    void* slot_ptr = tuple->GetSlot(slot->tuple_offset());
-    DCHECK(slot->type().IsVarLenStringType());
-
-    // The string value of the slot has a pointer to memory from the Kudu row.
-    StringValue* val = reinterpret_cast<StringValue*>(slot_ptr);
-    char* old_buf = val->ptr;
-    // Kudu never returns values larger than 8MB
-    DCHECK_LE(val->len, 8 * (1 << 20));
-    val->ptr = reinterpret_cast<char*>(mem_pool->TryAllocate(val->len));
-    if (LIKELY(val->len > 0)) {
-      // The allocator returns a NULL ptr when out of memory.
-      if (UNLIKELY(val->ptr == NULL)) {
-        return mem_pool->mem_tracker()->MemLimitExceeded(state_,
-            "Kudu scanner could not allocate memory for string", val->len);
-      }
-      memcpy(val->ptr, old_buf, val->len);
-    }
-  }
-  return Status::OK();
-}
-
-
-Status KuduScanner::KuduRowToImpalaTuple(const KuduScanBatch::RowPtr& row,
-    RowBatch* row_batch, Tuple* tuple) {
-  for (int i = 0; i < scan_node_->tuple_desc_->slots().size(); ++i) {
-    const SlotDescriptor* info = scan_node_->tuple_desc_->slots()[i];
-    void* slot = tuple->GetSlot(info->tuple_offset());
-
-    if (row.IsNull(i)) {
-      SetSlotToNull(tuple, *info);
-      continue;
-    }
-
-    int max_len = -1;
-    switch (info->type().type) {
-      case TYPE_VARCHAR:
-        max_len = info->type().len;
-        DCHECK_GT(max_len, 0);
-        // Fallthrough intended.
-      case TYPE_STRING: {
-        // For types with auxiliary memory (String, Binary,...) store the original memory
-        // location in the tuple to avoid the copy when the conjuncts do not pass. Relocate
-        // the memory into the row batch's memory in a later step.
-        kudu::Slice slice;
-        KUDU_RETURN_IF_ERROR(row.GetString(i, &slice),
-            "Error getting column value from Kudu.");
-        StringValue* sv = reinterpret_cast<StringValue*>(slot);
-        sv->ptr = const_cast<char*>(reinterpret_cast<const char*>(slice.data()));
-        sv->len = static_cast<int>(slice.size());
-        if (max_len > 0) sv->len = std::min(sv->len, max_len);
-        break;
-      }
-      case TYPE_TINYINT:
-        KUDU_RETURN_IF_ERROR(row.GetInt8(i, reinterpret_cast<int8_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_SMALLINT:
-        KUDU_RETURN_IF_ERROR(row.GetInt16(i, reinterpret_cast<int16_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_INT:
-        KUDU_RETURN_IF_ERROR(row.GetInt32(i, reinterpret_cast<int32_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_BIGINT:
-        KUDU_RETURN_IF_ERROR(row.GetInt64(i, reinterpret_cast<int64_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_FLOAT:
-        KUDU_RETURN_IF_ERROR(row.GetFloat(i, reinterpret_cast<float*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_DOUBLE:
-        KUDU_RETURN_IF_ERROR(row.GetDouble(i, reinterpret_cast<double*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_BOOLEAN:
-        KUDU_RETURN_IF_ERROR(row.GetBool(i, reinterpret_cast<bool*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      default:
-        DCHECK(false) << "Impala type unsupported in Kudu: "
-            << TypeToString(info->type().type);
-        return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING,
-            TypeToString(info->type().type));
-    }
-  }
-  return Status::OK();
-}
-
-
 Status KuduScanner::GetNextScannerBatch() {
   SCOPED_TIMER(state_->total_storage_wait_timer());
   int64_t now = MonotonicMicros();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index 0ed5221..bf84b08 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -63,12 +63,6 @@ class KuduScanner {
   /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one.
   Status HandleEmptyProjection(RowBatch* row_batch, bool* batch_done);
 
-  /// Set 'slot' to Null in 'tuple'.
-  void SetSlotToNull(Tuple* tuple, const SlotDescriptor& slot);
-
-  /// Returns true if 'slot' is Null in 'tuple'.
-  bool IsSlotNull(Tuple* tuple, const SlotDescriptor& slot);
-
   /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch.
   ///  - 'batch' is the batch that will point to the new tuples.
   ///  - *tuple_mem should be the location to output tuples.
@@ -82,26 +76,11 @@ class KuduScanner {
   /// Closes the current kudu::client::KuduScanner.
   void CloseCurrentClientScanner();
 
-  /// Given a tuple, copies the values of those columns that require additional memory
-  /// from memory owned by the kudu::client::KuduScanner into memory owned by the
-  /// RowBatch. Assumes that the other columns are already materialized.
-  Status RelocateValuesFromKudu(Tuple* tuple, MemPool* mem_pool);
-
-  /// Transforms a kudu row into an Impala row. Columns that don't require auxiliary
-  /// memory are copied to the tuple directly. String columns are stored as a reference to
-  /// the memory of the RowPtr and need to be relocated later.
-  Status KuduRowToImpalaTuple(const kudu::client::KuduScanBatch::RowPtr& row,
-      RowBatch* row_batch, Tuple* tuple);
-
   inline Tuple* next_tuple(Tuple* t) const {
     uint8_t* mem = reinterpret_cast<uint8_t*>(t);
     return reinterpret_cast<Tuple*>(mem + scan_node_->tuple_desc()->byte_size());
   }
 
-  /// Returns the tuple idx into the row for this scan node to output to.
-  /// Currently this is always 0.
-  int tuple_idx() const { return 0; }
-
   KuduScanNode* scan_node_;
   RuntimeState* state_;
 
@@ -120,13 +99,6 @@ class KuduScanner {
 
   /// The scanner's cloned copy of the conjuncts to apply.
   vector<ExprContext*> conjunct_ctxs_;
-
-  /// List of string slots that need relocation for their auxiliary memory.
-  std::vector<SlotDescriptor*> string_slots_;
-
-  /// Number of string slots that need relocation (i.e. size of string_slots_), stored
-  /// separately to avoid calling vector::size() in the hot path (IMPALA-3348).
-  int num_string_slots_;
 };
 
 } /// namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 3a0fc06..9a9c058 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.thrift.TSlotDescriptor;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -140,9 +141,8 @@ public class SlotDescriptor {
   }
 
   public Path getPath() { return path_; }
-
   public boolean isScanSlot() { return path_ != null && path_.isRootedAtTable();
}
-
+  public boolean isKuduScanSlot() { return getColumn() instanceof KuduColumn; }
   public Column getColumn() { return !isScanSlot() ? null : path_.destColumn(); }
 
   public ColumnStats getStats() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 5fbe5f6..bf6b93a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.impala.analysis;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +29,6 @@ import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Table;
-import org.apache.impala.catalog.View;
 import org.apache.impala.thrift.TTupleDescriptor;
 
 import com.google.common.base.Joiner;
@@ -59,8 +59,9 @@ import com.google.common.collect.Lists;
  *
  * Memory Layout
  * Slots are placed in descending order by size with trailing bytes to store null flags.
- * Null flags are omitted for non-nullable slots. There is no padding between tuples when
- * stored back-to-back in a row batch.
+ * Null flags are omitted for non-nullable slots, except for Kudu scan slots which always
+ * have a null flag to match Kudu's client row format. There is no padding between tuples
+ * when stored back-to-back in a row batch.
  *
  * Example: select bool_col, int_col, string_col, smallint_col from functional.alltypes
  * Slots:   string_col|int_col|smallint_col|bool_col|null_byte
@@ -118,6 +119,21 @@ public class TupleDescriptor {
     return result;
   }
 
+  /**
+   * Returns all materialized slots ordered by their offset. Valid to call after the
+   * mem layout has been computed.
+   */
+  public ArrayList<SlotDescriptor> getSlotsOrderedByOffset() {
+    Preconditions.checkState(hasMemLayout_);
+    ArrayList<SlotDescriptor> result = getMaterializedSlots();
+    Collections.sort(result, new Comparator<SlotDescriptor> () {
+      public int compare(SlotDescriptor a, SlotDescriptor b) {
+        return Integer.compare(a.getByteOffset(), b.getByteOffset());
+      }
+    });
+    return result;
+  }
+
   public Table getTable() {
     if (path_ == null) return null;
     return path_.getRootTable();
@@ -199,9 +215,7 @@ public class TupleDescriptor {
    * Materialize all slots.
    */
   public void materializeSlots() {
-    for (SlotDescriptor slot: slots_) {
-      slot.setIsMaterialized(true);
-    }
+    for (SlotDescriptor slot: slots_) slot.setIsMaterialized(true);
   }
 
   public TTupleDescriptor toThrift(Integer tableId) {
@@ -223,7 +237,7 @@ public class TupleDescriptor {
         new HashMap<Integer, List<SlotDescriptor>>();
 
     // populate slotsBySize
-    int numNullableSlots = 0;
+    int numNullBits = 0;
     int totalSlotSize = 0;
     for (SlotDescriptor d: slots_) {
       if (!d.isMaterialized()) continue;
@@ -239,14 +253,14 @@ public class TupleDescriptor {
       }
       totalSlotSize += d.getType().getSlotSize();
       slotsBySize.get(d.getType().getSlotSize()).add(d);
-      if (d.getIsNullable()) ++numNullableSlots;
+      if (d.getIsNullable() || d.isKuduScanSlot()) ++numNullBits;
     }
     // we shouldn't have anything of size <= 0
     Preconditions.checkState(!slotsBySize.containsKey(0));
     Preconditions.checkState(!slotsBySize.containsKey(-1));
 
     // assign offsets to slots in order of descending size
-    numNullBytes_ = (numNullableSlots + 7) / 8;
+    numNullBytes_ = (numNullBits + 7) / 8;
     int slotOffset = 0;
     int nullIndicatorByte = totalSlotSize;
     int nullIndicatorBit = 0;
@@ -266,13 +280,16 @@ public class TupleDescriptor {
         slotOffset += slotSize;
 
         // assign null indicator
-        if (d.getIsNullable()) {
+        if (d.getIsNullable() || d.isKuduScanSlot()) {
           d.setNullIndicatorByte(nullIndicatorByte);
           d.setNullIndicatorBit(nullIndicatorBit);
           nullIndicatorBit = (nullIndicatorBit + 1) % 8;
           if (nullIndicatorBit == 0) ++nullIndicatorByte;
-        } else {
-          // non-nullable slots will have 0 for the byte offset and -1 for the bit mask
+        }
+        // non-nullable slots have 0 for the byte offset and -1 for the bit mask
+        // to make sure IS NULL always evaluates to false in the BE without having
+        // to check nullability explicitly
+        if (!d.getIsNullable()) {
           d.setNullIndicatorBit(-1);
           d.setNullIndicatorByte(0);
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 9434801..d338608 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -118,13 +118,16 @@ public class KuduScanNode extends ScanNode {
       // Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu)
       analyzer.materializeSlots(conjuncts_);
 
+      // Compute mem layout before the scan range locations because creation of the Kudu
+      // scan tokens depends on having a mem layout.
+      computeMemLayout(analyzer);
+
       // Creates Kudu scan tokens and sets the scan range locations.
       computeScanRangeLocations(analyzer, client, rpcTable);
     } catch (Exception e) {
       throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e);
     }
 
-    computeMemLayout(analyzer);
     computeStats(analyzer);
   }
 
@@ -189,15 +192,15 @@ public class KuduScanNode extends ScanNode {
 
   /**
    * Returns KuduScanTokens for this scan given the projected columns and predicates that
-   * will be pushed to Kudu.
+   * will be pushed to Kudu. The projected Kudu columns are ordered by offset in an
+   * Impala tuple to make the Impala and Kudu tuple layouts identical.
    */
   private List<KuduScanToken> createScanTokens(KuduClient client,
       org.apache.kudu.client.KuduTable rpcTable) {
     List<String> projectedCols = Lists.newArrayList();
-    for (SlotDescriptor desc: getTupleDesc().getSlots()) {
-      if (desc.isMaterialized()) projectedCols.add(desc.getColumn().getName());
+    for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
+      projectedCols.add(desc.getColumn().getName());
     }
-
     KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
     tokenBuilder.setProjectedColumnNames(projectedCols);
     for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);


Mime
View raw message