impala-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Ho (Code Review)" <>
Subject [Impala-ASF-CR] IMPALA-4026: Implement double-buffering for BlockingQueue
Date Tue, 27 Sep 2016 22:58:00 GMT
Michael Ho has posted comments on this change.

Change subject: IMPALA-4026: Implement double-buffering for BlockingQueue

Patch Set 5:

File be/src/exec/

Line 79:   row_batches_put_timer_ = runtime_profile()->AddCounter("QueuePutTime", TUnit::TIME_NS);
> we usually do this in Open() or Prepare() (see other counters in e.g. HdfsS
File be/src/util/blocking-queue.h:

PS5, Line 75: NotitfyAll
> NotifyAll().

PS5, Line 90: DCHECK
> nit: DCHECK_NE
DCHECK removed.

PS5, Line 98:  This may
            :     // imply that some writers may be sleeping on a partially empty queue
> Maybe "If this race occurs, a writer can stay blocked on a partially empty 

PS5, Line 99: Given the major
            :     // use case is with HDFS scan node which has multiple producers and one
consumer, it's
            :     // expected that some producers can make progress.
> Maybe more simply: "This should only occur when producers are faster than c

Line 102:     put_cv_.NotifyOne();
> is it worth explaining this race rather than fixing it?  Doesn't pthreads o
Not sure I understand the optimization you are referring to here.

The race here is that a thread can call put_cv_.NotifyOne() while another thread just checks
the queue's size but before it calls put_cv_.Wait(). AFAIK, the only way to avoid this race
is to also grab the "put_lock_" in BlockingGet() which kind of defeats the purpose of the

PS5, Line 137: GetSize
> this is an unfortunate name. I read it to be the size of the "Get" list.  M

Line 171:     // the queue's size could have changed once the lock is dropped.
> how do you know the deque::size() method doesn't need the synchronization (
Definitely expensive as writer can now block reader even if "get_list_" is not empty.

On x86, an aligned 64-bit read should be atomic. That said, it's a good point to that we cannot
assume the implementation of dequeue::size(). Added an AtomicInt64 for the get_list's size
to make sure all reads will be 32-bit consistent.

Line 197:   boost::scoped_ptr<std::deque<T>> put_list_;
> why add this extra indirection?  couldn't we just do deque::swap() directly
Good point. Done. Also rearranged the class members a bit.
File be/src/util/condition-variable.h:

PS5, Line 29: doesn't have any logic to deal with thread interruption
> what's the implication of that?  are signals not handled properly?
The thread interruption feature has nothing to do with signal handling. It's a boost library
feature which we don't use (at least for BlockingQueue). It's basically a way for one thread
to interrupt another thread at well defined point in the code.

To view, visit
To unsubscribe, visit

Gerrit-MessageType: comment
Gerrit-Change-Id: Ib9f4cf351455efefb0f3bb791cf9bc82d1421d54
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Michael Ho <>
Gerrit-Reviewer: Alex Behm <>
Gerrit-Reviewer: Chen Huang <>
Gerrit-Reviewer: Dan Hecht <>
Gerrit-Reviewer: Michael Ho <>
Gerrit-Reviewer: Mostafa Mokhtar <>
Gerrit-Reviewer: Tim Armstrong <>
Gerrit-HasComments: Yes

View raw message