drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [2/5] drill git commit: DRILL-2855: Fix invalid result issues with StreamAggBatch
Date Tue, 28 Apr 2015 02:58:29 GMT
DRILL-2855: Fix invalid result issues with StreamAggBatch


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fa0f1c5f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fa0f1c5f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fa0f1c5f

Branch: refs/heads/master
Commit: fa0f1c5f0feace931ddb236a7eadc57f3b4135b2
Parents: 3f781be
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Sun Apr 19 14:16:09 2015 -0700
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Mon Apr 27 14:11:39 2015 -0700

----------------------------------------------------------------------
 .../impl/aggregate/StreamingAggTemplate.java     | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fa0f1c5f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 05560c4..1f139d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -153,21 +153,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
               }
 
               // Update the indices to set the state for processing next record in incoming
batch in subsequent doWork calls.
-              previousIndex = currentIndex;
-              incIndex();
+              previousIndex = -1;
               return setOkAndReturn();
             }
           }
           previousIndex = currentIndex;
         }
 
-        InternalBatch previous = null;
+        InternalBatch previous = new InternalBatch(incoming);
+
         try {
           while (true) {
-            if (previous != null) {
-              previous.clear();
-            }
-            previous = new InternalBatch(incoming);
+
             IterOutcome out = outgoing.next(0, incoming);
             if (EXTRA_DEBUG) {
               logger.debug("Received IterOutcome of {}", out);
@@ -232,16 +229,17 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
                   if (EXTRA_DEBUG) {
                     logger.debug("This is not the same as the previous, add record and continue
outside.");
                   }
-                  previousIndex = currentIndex;
                   if (addedRecordCount > 0) {
                     if (outputToBatchPrev(previous, previousIndex, outputCount)) {
                       if (EXTRA_DEBUG) {
                         logger.debug("Output container is full. flushing it.");
                       }
+                      previousIndex = -1;
                       return setOkAndReturn();
                     }
-                    continue outside;
                   }
+                  previousIndex = -1;
+                  continue outside;
                 }
               }
             case STOP:
@@ -250,8 +248,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator
{
               outcome = out;
               return AggOutcome.CLEANUP_AND_RETURN;
             }
-
-        }
+          }
         } finally {
           // make sure to clear previous
           if (previous != null) {


Mime
View raw message