drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject drill git commit: DRILL-2664: In StreamingAgg fix issues with handling the case where output batch is full.
Date Thu, 02 Apr 2015 05:45:40 GMT
Repository: drill
Updated Branches:
  refs/heads/master 767711919 -> cc97cd471


DRILL-2664: In StreamingAgg fix issues with handling the case where output batch is full.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cc97cd47
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cc97cd47
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cc97cd47

Branch: refs/heads/master
Commit: cc97cd471c40030d136bb93a1873e01b6cd6e8ef
Parents: 7677119
Author: Aman Sinha <asinha@maprtech.com>
Authored: Wed Apr 1 19:00:27 2015 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Wed Apr 1 19:00:27 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/WriterRecordBatch.java       |  2 +-
 .../physical/impl/aggregate/StreamingAggTemplate.java     | 10 +++++-----
 2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/cc97cd47/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 74a674e..45022f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -126,8 +126,8 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer>
{
         }
       } while(upstream != IterOutcome.NONE);
     }catch(Exception ex){
-      kill(false);
       logger.error("Failure during query", ex);
+      kill(false);
       context.fail(ex);
       return IterOutcome.STOP;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/cc97cd47/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 36f9f29..bf27187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -79,11 +79,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
     }
     try { // outside loop to ensure that first is set to false after the first run.
       outputCount = 0;
+      // allocate outgoing since either this is the first time or if a subsequent time we
would
+      // have sent the previous outgoing batch to downstream operator
+      allocateOutgoing();
 
-      // if we're in the first state, allocate outgoing.
       if (first) {
         this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
-        allocateOutgoing();
       }
 
       if (incoming.getRecordCount() == 0) {
@@ -232,11 +233,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
                   }
                   previousIndex = currentIndex;
                   if (addedRecordCount > 0) {
-                    if (!outputToBatchPrev(previous, previousIndex, outputCount)) {
+                    if (outputToBatchPrev(previous, previousIndex, outputCount)) {
                       if (EXTRA_DEBUG) {
                         logger.debug("Output container is full. flushing it.");
-                        return setOkAndReturn();
                       }
+                      return setOkAndReturn();
                     }
                     continue outside;
                   }
@@ -320,7 +321,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
     outputRecordKeysPrev(b1, inIndex, outIndex);
     outputRecordValues(outIndex);
     resetValues();
-    resetValues();
     outputCount++;
     addedRecordCount = 0;
 


Mime
View raw message