impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Ho (Code Review)" <ger...@cloudera.org>
Subject [Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Date Fri, 06 Oct 2017 22:03:51 GMT
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 )

Change subject: IMPALA-4856: Port data stream service to KRPC
......................................................................


Patch Set 2:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc
File be/src/runtime/krpc-data-stream-recvr.cc:

http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc@141
PS2, Line 141:   while (true) {
             :     // wait until something shows up or we know we're done
             :     while (!is_cancelled_ && batch_queue_.empty() && blocked_senders_.empty()
             :         && num_remaining_senders_ > 0) {
             :       VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id()
             :                << " node=" << recvr_->dest_node_id();
             :       // Don't count time spent waiting on the sender as active time.
             :       CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
             :       CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_);
             :       CANCEL_SAFE_SCOPED_TIMER(
             :           received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_,
             :           &is_cancelled_);
             :       data_arrival_cv_.wait(l);
             :     }
             : 
             :     if (is_cancelled_) return Status::CANCELLED;
             : 
             :     if (blocked_senders_.empty() && batch_queue_.empty()) {
             :       DCHECK_EQ(num_remaining_senders_, 0);
             :       return Status::OK();
             :     }
             : 
             :     received_first_batch_ = true;
             : 
             :     // Either we'll consume a row batch from batch_queue_, or it's empty. In
either case,
             :     // take a blocked sender and retry delivering their batch. There is a window
between
             :     // which a deferred batch is dequeued from blocked_senders_ queue and when
it's
             :     // inserted into batch_queue_. However, a receiver won't respond to the
sender until
             :     // the deferred row batch has been inserted. The sender will wait for all
in-flight
             :     // RPCs to complete before sending EOS RPC so num_remaining_senders_ should
be > 0.
             :     if (!blocked_senders_.empty()) {
             :       recvr_->mgr_->EnqueueRowBatch(
             :           {recvr_->fragment_instance_id(), move(blocked_senders_.front())});
             :       blocked_senders_.pop();
             :     }
             : 
             :     if (!batch_queue_.empty()) {
             :       RowBatch* result = batch_queue_.front().second;
             :       recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
             :       VLOG_ROW << "fetched #rows=" << result->num_rows();
             :       current_batch_.reset(result);
             :       *next_batch = current_batch_.get();
             :       batch_queue_.pop_front();
             :       return Status::OK();
             :     }
> This loop may lead to live lock in the rare case in which blocked_senders_ 
Actually, mis-read the thing in the heat of debugging. If both queues are empty, we may return
early in line 160 above if num_remaining_senders == 0. So, we shouldn't spin forever. Otherwise,
the thread should sleep and wait in line 153. This loop tends to have the unfortunate behavior
of popping all entries off blocked_senders_ first before dropping the lock and sleeping on
line 153.

Although there is a window in which both queues are empty when a row batch is deserialized
and moved from blocked_senders_ to batch_queue_, it should be impossible for num_remaining_senders_
to reach 0 in that window. The reason is that the sender of that row batch will not be responded
to until after the row batch has been inserted into batch_queue_ (after it has been popped
from blocked_senders_). In which case, batch_queue_ will become non-empty first before the
remote sender gets a reply.



-- 
To view, visit http://gerrit.cloudera.org:8080/8023
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1
Gerrit-Change-Number: 8023
Gerrit-PatchSet: 2
Gerrit-Owner: Michael Ho <kwho@cloudera.com>
Gerrit-Reviewer: Michael Ho <kwho@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sailesh@cloudera.com>
Gerrit-Comment-Date: Fri, 06 Oct 2017 22:03:51 +0000
Gerrit-HasComments: Yes

Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message