drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adene...@apache.org
Subject drill git commit: DRILL-3845: UnorderedReceiver shouldn't terminate until it receives a final batch
Date Fri, 15 Jan 2016 22:30:17 GMT
Repository: drill
Updated Branches:
  refs/heads/master 9dad9da6d -> 8a28131e5


DRILL-3845: UnorderedReceiver shouldn't terminate until it receives a final batch

MergingRecordBatch doesn't wait for last batch when it's an early termination

this closes #319


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8a28131e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8a28131e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8a28131e

Branch: refs/heads/master
Commit: 8a28131e5bc3c51d1044f964968a319faf78dd8f
Parents: 9dad9da
Author: adeneche <adeneche@gmail.com>
Authored: Wed Jan 6 14:50:38 2016 -0800
Committer: adeneche <adeneche@gmail.com>
Committed: Fri Jan 15 10:24:24 2016 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/mergereceiver/MergingRecordBatch.java  | 7 ++++---
 .../physical/impl/partitionsender/PartitionerTemplate.java    | 6 +++---
 2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8a28131e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 8db24af..0049059 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -508,9 +508,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       informSenders();
     } else {
       close();
-      for (final RawFragmentBatchProvider provider : fragProviders) {
-        provider.kill(context);
-      }
+    }
+
+    for (final RawFragmentBatchProvider provider : fragProviders) {
+      provider.kill(context);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/8a28131e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index aeac01d..8fe0ab0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -230,7 +230,6 @@ public abstract class PartitionerTemplate implements Partitioner {
     private final OperatorStats stats;
 
     private boolean isLast = false;
-    private volatile boolean terminated = false;
     private boolean dropAll = false;
     private int recordCount;
     private int totalRecords;
@@ -256,7 +255,8 @@ public abstract class PartitionerTemplate implements Partitioner {
 
     @Override
     public void terminate() {
-      terminated = true;
+      // receiver already terminated, don't send anything to it from now on
+      dropAll = true;
     }
 
     @RuntimeOverridden
@@ -286,7 +286,7 @@ public abstract class PartitionerTemplate implements Partitioner {
       //      sender has acknowledged the terminate request. After sending the last batch,
all further batches are
       //      dropped.
       //   3. Partitioner thread is interrupted due to cancellation of fragment.
-      final boolean isLastBatch = isLast || terminated || Thread.currentThread().isInterrupted();
+      final boolean isLastBatch = isLast || Thread.currentThread().isInterrupted();
 
       // if the batch is not the last batch and the current recordCount is zero, then no
need to send any RecordBatches
       if (!isLastBatch && recordCount == 0) {


Mime
View raw message