drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [7/9] drill git commit: DRILL-2574: SendingAccountor can suffer from lost updates
Date Thu, 26 Mar 2015 17:51:13 GMT
DRILL-2574: SendingAccountor can suffer from lost updates

SendingAccountor
- atomically get and set the message count to wait for


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/910e278e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/910e278e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/910e278e

Branch: refs/heads/master
Commit: 910e278ed3ee84db17de9e98739422d4ef86f124
Parents: eeeb075
Author: Chris Westin <cwestin@yahoo.com>
Authored: Wed Mar 25 18:44:41 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Mar 26 09:58:36 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/physical/impl/SendingAccountor.java | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/910e278e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
index 8794188..21fc800 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
@@ -27,10 +27,10 @@ import java.util.concurrent.atomic.AtomicInteger;
  * TODO: Need to update to use long for number of pending messages.
  */
 public class SendingAccountor {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
 
   private final AtomicInteger batchesSent = new AtomicInteger(0);
-  private Semaphore wait = new Semaphore(0);
+  private final Semaphore wait = new Semaphore(0);
 
   public void increment() {
     batchesSent.incrementAndGet();
@@ -42,8 +42,10 @@ public class SendingAccountor {
 
   public synchronized void waitForSendComplete() {
     try {
-      wait.acquire(batchesSent.get());
-      batchesSent.set(0);
+      int waitForBatches;
+      while((waitForBatches = batchesSent.getAndSet(0)) != 0) {
+        wait.acquire(waitForBatches);
+      }
     } catch (InterruptedException e) {
       logger.warn("Failure while waiting for send complete.", e);
       // TODO InterruptedException


Mime
View raw message