impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Tauber-Marshall (Code Review)" <ger...@cloudera.org>
Subject [Impala-ASF-CR] PREVIEW: IMPALA-3742: partitions INSERTs into Kudu tables
Date Thu, 02 Mar 2017 23:04:17 GMT
Thomas Tauber-Marshall has posted comments on this change.

Change subject: PREVIEW: IMPALA-3742: partitions INSERTs into Kudu tables
......................................................................


Patch Set 3:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/6037/1/be/src/runtime/data-stream-sender.cc
File be/src/runtime/data-stream-sender.cc:

PS1, Line 457:       } else {
             :         RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(current_row));
             :       }
             :     }
             :   }
             :   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
             :   RETURN_IF_ERROR(state->CheckQueryState());
> we'll need to find a way to avoid doing this for every row batch
Done


PS1, Line 466: 
             : Status DataStreamSender::FlushFinal(RuntimeState* state) {
             :   DCHECK(!flushed_);
             :   DCHECK(!closed_);
             :   flushed_ = true;
             :   for (int i = 0; i < channels_.size(); ++i) {
             :     // If we hit an error here, we can return without closing the remaining
channels as
             :     // the error is propagated back to the coordinator, which in turn cancels
the query,
             :     // which will cause the remaining open channels to be closed.
             :     RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state));
             :   }
             :   return Status::OK();
             : }
             : 
             : void DataStreamSender::Close(RuntimeState* state) {
             :   if (closed_) return;
             :   for (int i = 0; i < channels_.size(); ++i) {
             :     channels_[i]->Teardown(state);
             :   }
             :   if (partitioner_ != NULL) {
             :     partitioner_->Close(state);
             :   }
             :   DataSink::Close(state);
             :   closed_ = true;
             : }
             : 
             : Status DataStreamSender::SerializeBatch(RowBatch* src, TRowBatch* dest, int
num_receivers) {
             :   VLOG_ROW << "serializing " << src->num_rows() << " rows";
             :   {
             :     SCOPED_TIMER(profile_->total_time_counter());
             :     SCOPED_TIMER(serialize_batch_timer_);
             :     RETURN_IF_ERROR(src->Serialize(dest));
             :     int bytes = RowBatch::GetBatchSize(*dest);
             :     int uncompressed_bytes = bytes - dest->tuple_data.size() + dest->uncompressed_size;
             :     // The size output_batch would be if we didn't compress tuple_data (will
be equal to
             :     // actual batch size if tuple_data isn't compressed)
             : 
             :     COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
             :     COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
             :   }
             :   return Status::OK();
             : }
             : 
             : int64_t DataStreamSender::GetNumDataBytesSent() const {
             :   // TODO: do we need synchronization here or are reads & writes to 8-byte
ints
             :   // atomic?
             :   int64_t result = 0;
             :   for (int i = 0; i < channels_.size(); ++i) {
             :     result += channels_[i]->num_data_bytes_sent();
             :   }
             :   return result;
             : }
             : 
             : }
             : 
             : 
> let's see if we can share some code with kudu-table-sink, at least the swit
Done


http://gerrit.cloudera.org:8080/#/c/6037/1/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
File fe/src/main/java/org/apache/impala/analysis/InsertStmt.java:

PS1, Line 634:     boolean isKuduTable = table_ instanceof KuduTable;
             :     Set<String> kuduPartitionByColumnNames = null;
             :     if (isKuduTable) {
             :      
> should this be a Set? Could be duplicates. Also I'm not sure if we should u
I believe only the partition columns are needed here, based on testing and on the comment
in kudu/client/client.h which says ' May return a bad Status if the provided row does not
have all partitions of the partition key set'


-- 
To view, visit http://gerrit.cloudera.org:8080/6037
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic10b3295159354888efcde3df76b0edb24161515
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Gerrit-Reviewer: Matthew Jacobs <mj@cloudera.com>
Gerrit-Reviewer: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Gerrit-HasComments: Yes

Mime
View raw message