drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] drill git commit: 1908: new window function implementation
Date Fri, 06 Mar 2015 21:22:34 GMT
Repository: drill
Updated Branches:
  refs/heads/master 4b58ac8b7 -> fb293ba52


http://git-wip-us.apache.org/repos/asf/drill/blob/fb293ba5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java
deleted file mode 100644
index 00c20b2..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.planner.physical;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import net.hydromatic.linq4j.Ord;
-import net.hydromatic.optiq.util.BitSets;
-import org.apache.drill.exec.planner.logical.DrillRel;
-import org.apache.drill.exec.planner.logical.DrillWindowRel;
-import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.eigenbase.rel.RelCollation;
-import org.eigenbase.rel.RelCollationImpl;
-import org.eigenbase.rel.RelFieldCollation;
-import org.eigenbase.rel.RelNode;
-import org.eigenbase.rel.WindowRelBase;
-import org.eigenbase.relopt.RelOptRule;
-import org.eigenbase.relopt.RelOptRuleCall;
-import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.reltype.RelDataType;
-import org.eigenbase.reltype.RelDataTypeField;
-import org.eigenbase.reltype.RelRecordType;
-import org.eigenbase.sql.SqlAggFunction;
-
-import java.util.List;
-
-public class StreamingWindowPrule extends RelOptRule {
-  public static final RelOptRule INSTANCE = new StreamingWindowPrule();
-
-  private StreamingWindowPrule() {
-    super(RelOptHelper.some(DrillWindowRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)),
"Prel.WindowPrule");
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    final DrillWindowRel window = call.rel(0);
-    RelNode input = call.rel(1);
-
-    // TODO: Order window based on existing partition by
-    //input.getTraitSet().subsumes()
-
-    for (final Ord<WindowRelBase.Window> w : Ord.zip(window.windows)) {
-      WindowRelBase.Window windowBase = w.getValue();
-      DrillDistributionTrait distOnAllKeys =
-          new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
-              ImmutableList.copyOf(getDistributionFields(windowBase)));
-
-      RelCollation collation = getCollation(windowBase);
-      RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnAllKeys);
-      final RelNode convertedInput = convert(input, traits);
-
-      List<RelDataTypeField> newRowFields = Lists.newArrayList();
-      for(RelDataTypeField field : convertedInput.getRowType().getFieldList()) {
-        newRowFields.add(field);
-      }
-
-      Iterable<RelDataTypeField> newWindowFields = Iterables.filter(window.getRowType().getFieldList(),
new Predicate<RelDataTypeField>() {
-            @Override
-            public boolean apply(RelDataTypeField relDataTypeField) {
-              return relDataTypeField.getName().startsWith("w" + w.i + "$");
-            }
-      });
-
-      for(RelDataTypeField newField : newWindowFields) {
-        newRowFields.add(newField);
-      }
-
-      RelDataType rowType = new RelRecordType(newRowFields);
-
-      List<WindowRelBase.RexWinAggCall> newWinAggCalls = Lists.newArrayList();
-      for(Ord<WindowRelBase.RexWinAggCall> aggOrd : Ord.zip(windowBase.aggCalls)) {
-        WindowRelBase.RexWinAggCall aggCall = aggOrd.getValue();
-        newWinAggCalls.add(new WindowRelBase.RexWinAggCall(
-            (SqlAggFunction)aggCall.getOperator(), aggCall.getType(), aggCall.getOperands(),
aggOrd.i)
-        );
-      }
-
-      windowBase = new WindowRelBase.Window(
-          windowBase.groupSet,
-          windowBase.isRows,
-          windowBase.lowerBound,
-          windowBase.upperBound,
-          windowBase.orderKeys,
-          newWinAggCalls
-      );
-
-      input = new StreamingWindowPrel(
-          window.getCluster(),
-          window.getTraitSet().merge(traits),
-          convertedInput,
-          window.getConstants(),
-          rowType,
-          windowBase);
-    }
-
-    call.transformTo(input);
-  }
-
-  private RelCollation getCollation(WindowRelBase.Window window) {
-    List<RelFieldCollation> fields = Lists.newArrayList();
-    for (int group : BitSets.toIter(window.groupSet)) {
-      fields.add(new RelFieldCollation(group));
-    }
-    return RelCollationImpl.of(fields);
-  }
-
-  private List<DrillDistributionTrait.DistributionField> getDistributionFields(WindowRelBase.Window
window) {
-    List<DrillDistributionTrait.DistributionField> groupByFields = Lists.newArrayList();
-    for (int group : BitSets.toIter(window.groupSet)) {
-      DrillDistributionTrait.DistributionField field = new DrillDistributionTrait.DistributionField(group);
-      groupByFields.add(field);
-    }
-    return groupByFields;
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/fb293ba5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java
new file mode 100644
index 0000000..f23a073
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrel.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import com.google.common.collect.Lists;
+import net.hydromatic.optiq.util.BitSets;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.planner.common.DrillWindowRelBase;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexLiteral;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class WindowPrel extends DrillWindowRelBase implements Prel {
+  public WindowPrel(RelOptCluster cluster,
+                    RelTraitSet traits,
+                    RelNode child,
+                    List<RexLiteral> constants,
+                    RelDataType rowType,
+                    Window window) {
+    super(cluster, traits, child, constants, rowType, Collections.singletonList(window));
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new WindowPrel(getCluster(), traitSet, sole(inputs), constants, getRowType(),
windows.get(0));
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException
{
+    Prel child = (Prel) this.getChild();
+
+    PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+    final List<String> childFields = getChild().getRowType().getFieldNames();
+
+    // We don't support distinct partitions
+    checkState(windows.size() == 1, "Only one window is expected in WindowPrel");
+
+    Window window = windows.get(0);
+    List<NamedExpression> withins = Lists.newArrayList();
+    List<NamedExpression> aggs = Lists.newArrayList();
+    List<Order.Ordering> orderings = Lists.newArrayList();
+
+    for (int group : BitSets.toIter(window.groupSet)) {
+      FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+      withins.add(new NamedExpression(fr, fr));
+    }
+
+    for (AggregateCall aggCall : window.getAggregateCalls(this)) {
+      FieldReference ref = new FieldReference(aggCall.getName());
+      LogicalExpression expr = toDrill(aggCall, childFields);
+      aggs.add(new NamedExpression(expr, ref));
+    }
+
+    for (RelFieldCollation fieldCollation : window.orderKeys.getFieldCollations()) {
+      orderings.add(new Order.Ordering(fieldCollation.getDirection(), new FieldReference(childFields.get(fieldCollation.getFieldIndex())),
fieldCollation.nullDirection));
+    }
+
+    WindowPOP windowPOP = new WindowPOP(
+        childPOP,
+        withins.toArray(new NamedExpression[withins.size()]),
+        aggs.toArray(new NamedExpression[aggs.size()]),
+        orderings.toArray(new Order.Ordering[orderings.size()]),
+        Long.MIN_VALUE, //TODO: Get first/last to work
+        Long.MIN_VALUE);
+
+    creator.addMetadata(this, windowPOP);
+    return windowPOP;
+  }
+
+  protected LogicalExpression toDrill(AggregateCall call, List<String> fn) {
+    List<LogicalExpression> args = Lists.newArrayList();
+    for (Integer i : call.getArgList()) {
+      args.add(new FieldReference(fn.get(i)));
+    }
+
+    // for count(1).
+    if (args.isEmpty()) {
+      args.add(new ValueExpressions.LongExpression(1l));
+    }
+    LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(),
args, ExpressionPosition.UNKNOWN);
+    return expr;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor,
X value) throws E {
+    return logicalVisitor.visitPrel(this, value);
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+    return BatchSchema.SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getEncoding() {
+    return BatchSchema.SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return false;
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getChild());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fb293ba5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
new file mode 100644
index 0000000..796c654
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WindowPrule.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillWindowRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.eigenbase.rel.RelCollation;
+import org.eigenbase.rel.RelCollationImpl;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.WindowRelBase;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.reltype.RelRecordType;
+import org.eigenbase.sql.SqlAggFunction;
+
+import java.util.List;
+
+public class WindowPrule extends RelOptRule {
+  public static final RelOptRule INSTANCE = new WindowPrule();
+
+  private WindowPrule() {
+    super(RelOptHelper.some(DrillWindowRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)),
"Prel.WindowPrule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillWindowRel window = call.rel(0);
+    RelNode input = call.rel(1);
+
+    // TODO: Order window based on existing partition by
+    //input.getTraitSet().subsumes()
+
+    for (final Ord<WindowRelBase.Window> w : Ord.zip(window.windows)) {
+      WindowRelBase.Window windowBase = w.getValue();
+      DrillDistributionTrait distOnAllKeys =
+          new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+              ImmutableList.copyOf(getDistributionFields(windowBase)));
+
+      RelCollation collation = getCollation(windowBase);
+      RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnAllKeys);
+      final RelNode convertedInput = convert(input, traits);
+
+      List<RelDataTypeField> newRowFields = Lists.newArrayList();
+      for(RelDataTypeField field : convertedInput.getRowType().getFieldList()) {
+        newRowFields.add(field);
+      }
+
+      Iterable<RelDataTypeField> newWindowFields = Iterables.filter(window.getRowType().getFieldList(),
new Predicate<RelDataTypeField>() {
+            @Override
+            public boolean apply(RelDataTypeField relDataTypeField) {
+              return relDataTypeField.getName().startsWith("w" + w.i + "$");
+            }
+      });
+
+      for(RelDataTypeField newField : newWindowFields) {
+        newRowFields.add(newField);
+      }
+
+      RelDataType rowType = new RelRecordType(newRowFields);
+
+      List<WindowRelBase.RexWinAggCall> newWinAggCalls = Lists.newArrayList();
+      for(Ord<WindowRelBase.RexWinAggCall> aggOrd : Ord.zip(windowBase.aggCalls)) {
+        WindowRelBase.RexWinAggCall aggCall = aggOrd.getValue();
+        newWinAggCalls.add(new WindowRelBase.RexWinAggCall(
+            (SqlAggFunction)aggCall.getOperator(), aggCall.getType(), aggCall.getOperands(),
aggOrd.i)
+        );
+      }
+
+      windowBase = new WindowRelBase.Window(
+          windowBase.groupSet,
+          windowBase.isRows,
+          windowBase.lowerBound,
+          windowBase.upperBound,
+          windowBase.orderKeys,
+          newWinAggCalls
+      );
+
+      input = new WindowPrel(
+          window.getCluster(),
+          window.getTraitSet().merge(traits),
+          convertedInput,
+          window.getConstants(),
+          rowType,
+          windowBase);
+    }
+
+    call.transformTo(input);
+  }
+
+  private RelCollation getCollation(WindowRelBase.Window window) {
+    List<RelFieldCollation> fields = Lists.newArrayList();
+    for (int group : BitSets.toIter(window.groupSet)) {
+      fields.add(new RelFieldCollation(group));
+    }
+
+    for (RelFieldCollation field : window.orderKeys.getFieldCollations()) {
+      fields.add(field);
+    }
+
+    return RelCollationImpl.of(fields);
+  }
+
+  private List<DrillDistributionTrait.DistributionField> getDistributionFields(WindowRelBase.Window
window) {
+    List<DrillDistributionTrait.DistributionField> groupByFields = Lists.newArrayList();
+    for (int group : BitSets.toIter(window.groupSet)) {
+      DrillDistributionTrait.DistributionField field = new DrillDistributionTrait.DistributionField(group);
+      groupByFields.add(field);
+    }
+    return groupByFields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fb293ba5/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 232778a..d99f40d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
+import org.apache.drill.exec.physical.impl.window.OverFinder;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillStoreRel;
@@ -144,6 +145,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
   }
 
   protected SqlNode validateNode(SqlNode sqlNode) throws ValidationException, RelConversionException,
ForemanSetupException {
+    final boolean enableWindow = context.getOptions().getOption(ExecConstants.ENABLE_WINDOW_FUNCTIONS).bool_val;
+    if (!enableWindow) {
+      final OverFinder overFinder = new OverFinder();
+      if (overFinder.findOver(sqlNode)) {
+        throw new ValidationException("Window Functions have been disabled");
+      }
+    }
+
     SqlNode sqlNodeValidated = planner.validate(sqlNode);
 
     // Check if the unsupported functionality is used

http://git-wip-us.apache.org/repos/asf/drill/blob/fb293ba5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 3d3e96f..6d9fa5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -87,7 +87,8 @@ public class SystemOptionManager implements OptionManager {
       QueryClassLoader.JAVA_COMPILER_VALIDATOR,
       QueryClassLoader.JAVA_COMPILER_JANINO_MAXSIZE,
       QueryClassLoader.JAVA_COMPILER_DEBUG,
-      ExecConstants.ENABLE_VERBOSE_ERRORS
+      ExecConstants.ENABLE_VERBOSE_ERRORS,
+      ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR
   };
 
   public final PStoreConfig<OptionValue> config;

http://git-wip-us.apache.org/repos/asf/drill/blob/fb293ba5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index a9d2ef8..2b8bd64 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -1,4 +1,4 @@
-/**
+/*******************************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -14,200 +14,112 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+ ******************************************************************************/
 package org.apache.drill.exec.physical.impl.window;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.util.FileUtils;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.pop.PopUnitTestBase;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.junit.Ignore;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
 import org.junit.Test;
 
-import java.util.List;
+public class TestWindowFrame extends BaseTestQuery {
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestWindowFrame extends PopUnitTestBase {
+  private void runTest(String data, String results, String window) throws Exception {
+    testNoResult("alter session set `%s`= true", ExecConstants.ENABLE_WINDOW_FUNCTIONS);
+    testBuilder()
+      .sqlQuery("select count(*) over pos_win `count`, sum(salary) over pos_win `sum` from
cp.`window/%s.json` window pos_win as (%s)", data, window)
+      .ordered()
+      .csvBaselineFile("window/" + results + ".tsv")
+      .baselineColumns("count", "sum")
+      .build().run();
+  }
 
+  /**
+   * Single batch with a single partition (position_id column)
+   */
   @Test
-  @Ignore
-  public void testWindowFrameWithOneKeyCount() throws Throwable {
-    try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-         Drillbit bit = new Drillbit(CONFIG, serviceSet);
-         DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
-
-      // run query.
-      bit.run();
-      client.connect();
-      List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL,
-          Files.toString(FileUtils.getResourceAsFile("/window/oneKeyCount.json"), Charsets.UTF_8)
-              .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/oneKeyCountData.json").toURI().toString())
-      );
-
-      long[] cntArr = {1, 2, 1, 2};
-      long[] sumArr = {100, 150, 25, 75};
-
-      // look at records
-      RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
-      int recordCount = 0;
-
-      for (QueryResultBatch batch : results) {
-        if (!batch.hasData()) {
-          continue;
-        }
-
-        assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
-        batchLoader.load(batch.getHeader().getDef(), batch.getData());
-
-        for (int r = 0; r < batchLoader.getRecordCount(); r++) {
-          recordCount++;
-          VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
-            BigIntVector.class,
-            batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
-          );
-          assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
-          wrapper = batchLoader.getValueAccessorById(
-            NullableBigIntVector.class,
-            batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
-          );
-          assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
-        }
-        batchLoader.clear();
-        batch.release();
-      }
-
-      assertEquals(4, recordCount);
-    }
+  public void testB1P1() throws Exception {
+    runTest("b1.p1.data", "b1.p1", "partition by position_id order by position_id");
   }
 
+  /**
+   * Single batch with a single partition (position_id column) and multiple sub-partitions
(sub column)
+   */
   @Test
-  @Ignore
-  public void testWindowFrameWithOneKeyMultipleBatches() throws Throwable {
-    try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-         Drillbit bit = new Drillbit(CONFIG, serviceSet);
-         DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
-
-      // run query.
-      bit.run();
-      client.connect();
-      List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL,
-          Files.toString(FileUtils.getResourceAsFile("/window/oneKeyCountMultiBatch.json"),
Charsets.UTF_8)
-              .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/mediumData.json").toURI().toString()));
-
-      // look at records
-      RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
-      int recordCount = 0;
-
-      for (QueryResultBatch batch : results) {
-        if (!batch.hasData()) {
-          continue;
-        }
-
-        assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
-        batchLoader.load(batch.getHeader().getDef(), batch.getData());
-        ValueVector.Accessor output = batchLoader.getValueAccessorById(NullableBigIntVector.class,
-          batchLoader.getValueVectorId(
-            new SchemaPath(new PathSegment.NameSegment("output"))).getFieldIds()[0]
-        ).getValueVector().getAccessor();
-        ValueVector.Accessor sum = batchLoader.getValueAccessorById(
-          BigIntVector.class,
-          batchLoader.getValueVectorId(
-            new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
-        ).getValueVector().getAccessor();
-        ValueVector.Accessor cnt = batchLoader.getValueAccessorById(
-          BigIntVector.class,
-          batchLoader.getValueVectorId(
-            new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
-        ).getValueVector().getAccessor();
-        int lastGroup = -1;
-        long groupCounter = 0;
-        long s = 0;
-        for (int r = 1; r <= batchLoader.getRecordCount(); r++) {
-          recordCount++;
-          int group = r / 4;
-          if (lastGroup != group) {
-            lastGroup = group;
-            groupCounter = 1;
-            s = 0;
-          } else {
-            groupCounter++;
-          }
-
-          s += group * 8 + r % 4;
+  public void testB1P1OrderBy() throws Exception {
+    runTest("b1.p1.data", "b1.p1.subs", "partition by position_id order by sub");
+  }
 
-          assertEquals("Count, Row " + r, groupCounter, cnt.getObject(r - 1));
-          assertEquals("Sum, Row " + r, s, sum.getObject(r - 1));
-          assertEquals("Output, Row " + r, s, output.getObject(r - 1));
-        }
-        batchLoader.clear();
-        batch.release();
-      }
+  /**
+   * Single batch with 2 partitions (position_id column)
+   */
+  @Test
+  public void testB1P2() throws Exception {
+    runTest("b1.p2.data", "b1.p2", "partition by position_id order by position_id");
+  }
 
-      assertEquals(1000, recordCount);
-    }
+  /**
+   * Single batch with 2 partitions (position_id column)
+   * with order by clause
+   */
+  @Test
+  public void testB1P2OrderBy() throws Exception {
+    runTest("b1.p2.data", "b1.p2.subs", "partition by position_id order by sub");
   }
 
+  /**
+   * 2 batches with 2 partitions (position_id column), each batch contains a different partition
+   */
   @Test
-  @Ignore
-  public void testWindowFrameWithTwoKeys() throws Throwable {
-    try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-         Drillbit bit = new Drillbit(CONFIG, serviceSet);
-         DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+  public void testB2P2() throws Exception {
+    runTest("b2.p2.data", "b2.p2", "partition by position_id order by position_id");
+  }
 
-      // run query.
-      bit.run();
-      client.connect();
-      List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL,
-          Files.toString(FileUtils.getResourceAsFile("/window/twoKeys.json"), Charsets.UTF_8)
-              .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/twoKeysData.json").toURI().toString())
-      );
+  @Test
+  public void testB2P2OrderBy() throws Exception {
+    runTest("b2.p2.data", "b2.p2.subs", "partition by position_id order by sub");
+  }
 
-      long[] cntArr = {1, 2, 1, 2, 1, 2, 1, 2};
-      long[] sumArr = {5, 15, 15, 35, 25, 55, 35, 75};
+  /**
+   * 2 batches with 4 partitions, one partition has rows in both batches
+   */
+  @Test
+  public void testB2P4() throws Exception {
+    runTest("b2.p4.data", "b2.p4", "partition by position_id order by position_id");
+  }
 
-      // look at records
-      RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
-      int recordCount = 0;
+  /**
+   * 2 batches with 4 partitions, one partition has rows in both batches
+   * no sub partition has rows in both batches
+   */
+  @Test
+  public void testB2P4OrderBy() throws Exception {
+    runTest("b2.p4.data", "b2.p4.subs", "partition by position_id order by sub");
+  }
 
-      for (QueryResultBatch batch : results) {
-        if (!batch.hasData()) {
-          continue;
-        }
+  /**
+   * 3 batches with 2 partitions, one partition has rows in all 3 batches
+   */
+  @Test
+  public void testB3P2() throws Exception {
+    runTest("b3.p2.data", "b3.p2", "partition by position_id order by position_id");
+  }
 
-        assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
-        batchLoader.load(batch.getHeader().getDef(), batch.getData());
+  /**
+   * 3 batches with 2 partitions, one partition has rows in all 3 batches
+   * 2 subs have rows in 2 batches
+   */
+  @Test
+  public void testB3P2OrderBy() throws Exception {
+    runTest("b3.p2.data", "b3.p2.subs", "partition by position_id order by sub");
+  }
 
-        for (int r = 0; r < batchLoader.getRecordCount(); r++) {
-          recordCount++;
-          VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
-            BigIntVector.class,
-            batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
-          );
-          assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
-          wrapper = batchLoader.getValueAccessorById(
-            NullableBigIntVector.class,
-            batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
-          );
-          assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
-        }
-        batchLoader.clear();
-        batch.release();
-      }
-      assertEquals(8, recordCount);
-    }
+  /**
+   * 4 batches with 4 partitions. After processing 1st batch, when innerNext() is called
again, framer can process
+   * current batch without the need to call next(incoming).
+   */
+  @Test
+  public void testb4P4() throws Exception {
+    runTest("b4.p4.data", "b4.p4", "partition by position_id order by position_id");
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fb293ba5/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java
deleted file mode 100644
index 6eff6db..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.sql;
-
-import org.apache.drill.BaseTestQuery;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class TestWindowFunctions extends BaseTestQuery {
-  @Test
-  @Ignore
-  public void testWindowSum() throws Exception {
-    test("select sum(position_id) over w from cp.`employee.json` window w as ( partition
by position_id order by position_id)");
-  }
-}


Mime
View raw message