impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Tauber-Marshall (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-3742: Partitions and sort INSERTs for Kudu tables
Date Fri, 07 Apr 2017 19:53:26 GMT
Thomas Tauber-Marshall has posted comments on this change.

Change subject: IMPALA-3742: Partitions and sort INSERTs for Kudu tables

Patch Set 2:

File be/src/exec/kudu-util.h:

PS1, Line 62: int col, PrimitiveType type
> Comment on these params.
File be/src/exprs/

PS2, Line 48:   using namespace kudu::client
> we typically don't use inline 'using' statements

PS2, Line 51: CreateKuduClient
> use QueryState::GetKuduClient
I haven't rebased in awhile and don't have this locally, and rebasing is probably going to
be painful as I'll have to rebase my patched version of Kudu for the toolchain also, so I'm
going to punt on it for today and address it when I have more time.

Line 61:   RegisterFunctionContext(ctx, state);
> comment the return value is ignored because the idx is stored on the base c

PS2, Line 72:  if (table_schema.Column(col).is_nullable()) {
> this should never be true right now, PK cols are not nullable. per my comme

PS2, Line 80:         return PartitionError(ctx,
            :             ErrorMsg::Init(TErrorCode::KUDU_NULL_CONSTRAINT_VIOLATION,
            :                 table_desc_->table_name()).msg());
> This will change the failure mode of our inserts, which is that NCV do not 

Line 87:     if (!s.ok()) {
> I checked with Alexey on the Kudu team, this would only fail if we: a) wrot

PS2, Line 95:   if (!s.ok()) {
            :     return PartitionError(
            :         ctx, Substitute("Failed to determine Kudu partition for row: $0", s.ToString()));
            :   }
> same thing about errors here, I think if this returns an error it means we 

PS2, Line 110:   ctx->fn_context(fn_context_index_)->AddWarning(msg.c_str());
> I don't know if there are any errors we need to be reporting here.
File be/src/exprs/kudu-partition-expr.h:

PS2, Line 30: number
> index

PS2, Line 47: //
> ///
File be/src/runtime/

PS1, Line 1845: f (fragment.__isset.output_sink && fragment.output_sink.__isset.stream_sink
              :       && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK
              :       && fragment.output_sink.stream_sink.output_partition.type ==
> Add a brief comment.
File be/src/scheduling/

Line 364:       DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
> I don't see what the point of this is. This enumerates possible values.
It doesn't include RANGE_PARTITIONED, although I don't believe we currently use RANGE_PARTITIONED,
so yeah not sure what this DCHECK is accomplishing. Want me to remove it?
File common/thrift/Exprs.thrift:

PS1, Line 134: 4
> what happened to 2 and 3?

PS1, Line 137: 5
> 3? :)
File common/thrift/Partitions.thrift:

PS2, Line 50:   // Expr that takes a row and returns its partition number. If specified,
            :   // 'partition_exprs' will be empty. Used for TPartitionType::KUDU.
            :   3: optional Exprs.TExpr partition_expr
> how about for Kudu the expr just goes in partition_exprs[0] (and it's the o
As discussed with Marcel, we're moving partition_expr out of TDataPartition anyways.
File fe/src/main/java/org/apache/impala/analysis/

PS1, Line 113: // Set in analyze(). Maps exprs in partitionKeyExprs_ to their column's position
in the
             :   // table, eg. partitionKeyExprs_[i] = table_.columns(partitionKeyIdx_[i])
             :   private List<Integer> partitionKeyIdxs_ = Lists.newArrayList();
> Just for my understanding, can you explain why this is needed for this chan
The partition keys are a subset of the primary keys and so not guaranteed to be the first
n columns. We only need to send the partition keys to Kudu to get the partition, so this allows
us to determine what column positions to write the partition values for the row into in the
be. It gets passed through TKuduPartitionExpr.referenced_columns
File fe/src/main/java/org/apache/impala/analysis/

PS2, Line 49: artitionKeyExprs_ = Expr.cloneList(partitionKeyExprs);
> Is there a reason we keep a separate copy of partitionKeyExprs in addition 
At the very least, I think we have to preserve the types.

PS2, Line 69:         children_.get(i).uncheckedCastTo(partitionKeyExprs_.get(i).getType());
> as we discussed, I feel like we shouldn't have to do this
The problem is that the analysis state gets reset and everything is reanalyzed. When that
happens, we lose implicit casts on the children.

The specific case that motivated this is a VALUES clause with a NullLiteral, which will hit
a Precondition check while being converted to thrift if it hasn't been implicitly cast to
something other than NULL_TYPE. I'm not sure how else to handle that.

PS2, Line 75: isConstantImpl
> Should this be constant?
I was primarily thinking about the round-robin values for rows that don't fit in any partition,
which makes this expr non-deterministic.
File fe/src/main/java/org/apache/impala/planner/

PS1, Line 48: // For kudu partition: expr that calls into the kudu client to get the partition
            :   private Expr partitionExpr_;
            :   // Should be called only by the static factory method for Kudu partitioned
            :   private DataPartition(TPartitionType type, Expr partitionExpr) {
            :     Preconditions.checkNotNull(partitionExpr);
            :     Preconditions.checkState(type == TPartitionType.KUDU);
            :     type_ = type;
            :     partitionExpr_ = partitionExpr;
            :     partitionExprs_ = Lists.newArrayList();
            :   }
> I am not sure I get this. Why not passing a single element list of the case
Good idea, but per discussions with Marcel we're moving partitionExpr out of the DataPartition
File fe/src/main/java/org/apache/impala/planner/

PS2, Line 45:   // For hash partition: exprs used to compute hash value.
            :   private List<Expr> partitionExprs_;
            :   // For kudu partition: expr that calls into the kudu client to get the partition
            :   private Expr partitionExpr_;
> I think this will be cleaned up a bit if we just store the Kudu partition e
File fe/src/main/java/org/apache/impala/planner/

PS2, Line 251: DescriptorTable.TABLE_SINK_ID
> I don't think this is what you want. I think you want the table ID of the t
This value doesn't get incremented - it will always be 0. Also, InsertStmt calls descTbl.setTargetTable
on its target table, so I'm fairly sure this will be the right id.

I agree that it seems hacky, and I would rather put something here like insertStmt.getTargetTable().getId(),
except that Table doesn't have a getId() and tableIds are generated during DescriptorTable.toThrift,
so I'm not sure how to thread that through.

I also think that the related code in is hacky, so perhaps there's someone
who understands how the DescriptorTable is supposed to work who I could ask to weigh in?
File fe/src/main/java/org/apache/impala/planner/

PS2, Line 541:       Expr partitionExpr = new KuduPartitionExpr(DescriptorTable.TABLE_SINK_ID,
             :           insertStmt.getPartitionKeyExprs(), insertStmt.getPartitionKeyIdxs());
             :       partitionExpr.analyze(ctx_.getRootAnalyzer());
> I wonder if we can/should clone the KuduPartitionExpr instead of creating i
File testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test:

PS2, Line 16: KUDU
> it'd be nice for this to indicate it's an exchange on the Kudu partitioning

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: I84ce0032a1b10958fdf31faef225372c5c38fdc4
Gerrit-PatchSet: 2
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Thomas Tauber-Marshall <>
Gerrit-Reviewer: Dimitris Tsirogiannis <>
Gerrit-Reviewer: Matthew Jacobs <>
Gerrit-Reviewer: Thomas Tauber-Marshall <>
Gerrit-HasComments: Yes

View raw message