impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthew Jacobs (Code Review)" <>
Subject [Impala-ASF-CR] PREVIEW: IMPALA-3742: partitions INSERTs into Kudu tables
Date Thu, 16 Feb 2017 22:33:14 GMT
Matthew Jacobs has posted comments on this change.

Change subject: PREVIEW: IMPALA-3742: partitions INSERTs into Kudu tables

Patch Set 1:


just some high level comments for now since this is WIP.

would you mind updating some of the planner tests so I can see what that looks like? presumably
they should fail and you can grab the updated test output in /tmp/PlannerTest/
File be/src/runtime/

PS1, Line 457:     kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;
             :     Status s = CreateKuduClient(table_desc_->kudu_master_addresses(), &client_);
             :     kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
             :     KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(),
             :         "Unable to open Kudu table");
             :     kudu::client::KuduPartitionerBuilder b(table_);
             :     kudu::client::KuduPartitioner* partitioner;
we'll need to find a way to avoid doing this for every row batch

PS1, Line 466:     unique_ptr<kudu::KuduPartialRow> row(table_->schema().NewRow());
             :     for (int i = 0; i < batch->num_rows(); ++i) {
             :       TupleRow* current_row = batch->GetRow(i);
             :       for (int j = 0; j < partition_expr_ctxs_.size(); ++j) {
             :         ExprContext* ctx = partition_expr_ctxs_[j];
             :         void* value = ctx->GetValue(current_row);
             :         PrimitiveType type = ctx->root()->type().type;
             :         switch (type) {
             :           case TYPE_VARCHAR:
             :           case TYPE_STRING: {
             :             StringValue* sv = reinterpret_cast<StringValue*>(value);
             :             kudu::Slice slice(reinterpret_cast<uint8_t*>(sv->ptr),
             :             KUDU_RETURN_IF_ERROR(row->SetString(j, slice),
             :                 "Could not set Kudu row value.");
             :             break;
             :           }
             :           case TYPE_FLOAT:
             :             KUDU_RETURN_IF_ERROR(
             :                 row->SetFloat(j, *reinterpret_cast<float*>(value)),
             :                 "Could not set Kudu row value.");
             :             break;
             :           case TYPE_DOUBLE:
             :             KUDU_RETURN_IF_ERROR(
             :                 row->SetDouble(j, *reinterpret_cast<double*>(value)),
             :                 "Could not set Kudu row value.");
             :             break;
             :           case TYPE_BOOLEAN:
             :             KUDU_RETURN_IF_ERROR(
             :                 row->SetBool(j, *reinterpret_cast<bool*>(value)),
             :                 "Could not set Kudu row value.");
             :             break;
             :           case TYPE_TINYINT:
             :             KUDU_RETURN_IF_ERROR(
             :                 row->SetInt8(j, *reinterpret_cast<int8_t*>(value)),
             :                 "Could not set Kudu row value.");
             :             break;
             :           case TYPE_SMALLINT:
             :             KUDU_RETURN_IF_ERROR(
             :                 row->SetInt16(j, *reinterpret_cast<int16_t*>(value)),
             :                 "Could not set Kudu row value.");
             :             break;
             :           case TYPE_INT:
             :             KUDU_RETURN_IF_ERROR(
             :                 row->SetInt32(j, *reinterpret_cast<int32_t*>(value)),
             :                 "Could not set Kudu row value.");
             :             break;
             :           case TYPE_BIGINT:
             :             KUDU_RETURN_IF_ERROR(
             :                 row->SetInt64(j, *reinterpret_cast<int64_t*>(value)),
             :                 "Could not set Kudu row value.");
             :             break;
             :           default:
             :             return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type));
             :         }
             :       }
let's see if we can share some code with kudu-table-sink, at least the switch statement. we
could put some stuff in kudu-util.h/cc
File common/thrift/Partitions.thrift:

PS1, Line 37: 
            :   // partitioning determined by Kudu
            :   KUDU
I think we'll need to find a way to avoid a new partition type, i.e. to treat this is hash

Line 50: }
If it's possible to encapsulate the hash fn as an expr, this might be a good place to put
the Expr, e.g.
4: opt TExpr hash_partition_fn

Otherwise, I wonder if we can at least determine kudu vs regular hash partitioning in some
other way than a new TPartitionType. e.g. we know the target table so maybe we can inspect
that at runtime.
File fe/src/main/java/org/apache/impala/analysis/

Line 625:   private void prepareExpressions(List<Column> selectExprTargetColumns,
I assume we'll have to do something similar for update/delete. Upsert is handled here too,

PS1, Line 634:     List<String> kuduPartitionByColumnNames = null;
             :     if (isKuduTable) {
             :       kuduPartitionByColumnNames = ((KuduTable) table_).getPartitionByColumnNames();
             :     }
should this be a Set? Could be duplicates. Also I'm not sure if we should use partition cols
or the primary key cols. Ultimately Kudu probably wants the PK. I think the query will be
invalid if selectExprTargetColumns doesn't contain the entire PK.

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic10b3295159354888efcde3df76b0edb24161515
Gerrit-PatchSet: 1
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Thomas Tauber-Marshall <>
Gerrit-Reviewer: Matthew Jacobs <>
Gerrit-HasComments: Yes

View raw message