calcite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jcama...@apache.org
Subject calcite git commit: [CALCITE-1828] Push the FILTER clause into Druid as a Filtered Aggregator (Zain Humayun)
Date Mon, 10 Jul 2017 18:19:41 GMT
Repository: calcite
Updated Branches:
  refs/heads/master 65d95e679 -> 551b5622c


[CALCITE-1828] Push the FILTER clause into Druid as a Filtered Aggregator (Zain Humayun)

Close apache/calcite#472


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/551b5622
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/551b5622
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/551b5622

Branch: refs/heads/master
Commit: 551b5622cfbc913c8ee547d6b95ba7898c7f340c
Parents: 65d95e6
Author: Zain Humayun <zhumayun@yahoo-inc.com>
Authored: Mon Jul 10 18:58:47 2017 +0100
Committer: Jesus Camacho Rodriguez <jcamacho@apache.org>
Committed: Mon Jul 10 19:00:19 2017 +0100

----------------------------------------------------------------------
 .../apache/calcite/rel/core/AggregateCall.java  |  13 +-
 .../calcite/adapter/druid/DruidQuery.java       |  93 +++--
 .../calcite/adapter/druid/DruidRules.java       | 213 ++++++++++-
 .../org/apache/calcite/test/DruidAdapterIT.java | 370 +++++++++++++++++++
 4 files changed, 656 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/551b5622/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java b/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
index 2c4ee63..501aba6 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/AggregateCall.java
@@ -206,13 +206,20 @@ public class AggregateCall {
       buf.append(arg);
     }
     buf.append(")");
-    if (filterArg >= 0) {
+    if (hasFilter()) {
       buf.append(" FILTER $");
       buf.append(filterArg);
     }
     return buf.toString();
   }
 
+  /**
+   * Returns true if and only if this AggregateCall has a filter argument
+   * */
+  public boolean hasFilter() {
+    return filterArg >= 0;
+  }
+
   @Override public boolean equals(Object o) {
     if (!(o instanceof AggregateCall)) {
       return false;
@@ -240,7 +247,7 @@ public class AggregateCall {
     return new Aggregate.AggCallBinding(
         aggregateRelBase.getCluster().getTypeFactory(), aggFunction,
         SqlTypeUtil.projectTypes(rowType, argList),
-        aggregateRelBase.getGroupCount(), filterArg >= 0);
+        aggregateRelBase.getGroupCount(), hasFilter());
   }
 
   /**
@@ -288,7 +295,7 @@ public class AggregateCall {
    * arguments. */
   public AggregateCall transform(Mappings.TargetMapping mapping) {
     return copy(Mappings.apply2((Mapping) mapping, argList),
-        filterArg < 0 ? -1 : Mappings.apply(mapping, filterArg));
+        hasFilter() ? Mappings.apply(mapping, filterArg) : -1);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/551b5622/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 1c39fc6..781ad28 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -178,11 +178,6 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
               || aggregate.indicator) {
             return litmus.fail("no grouping sets");
           }
-          for (AggregateCall call : aggregate.getAggCallList()) {
-            if (call.filterArg >= 0) {
-              return litmus.fail("no filtered aggregate functions");
-            }
-          }
         }
         if (r instanceof Filter) {
           final Filter filter = (Filter) r;
@@ -201,14 +196,24 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
     return true;
   }
 
-  boolean isValidFilter(RexNode e) {
-    return isValidFilter(e, false);
+  public boolean isValidFilter(RexNode e) {
+    return isValidFilter(e, false, null);
+  }
+
+  public boolean isValidFilter(RexNode e, RelNode input) {
+    return isValidFilter(e, false, input);
   }
 
-  boolean isValidFilter(RexNode e, boolean boundedComparator) {
+  public boolean isValidFilter(RexNode e, boolean boundedComparator, RelNode input) {
     switch (e.getKind()) {
     case INPUT_REF:
-      return true;
+      if (input == null) {
+        return true;
+      }
+      int nameIndex = ((RexInputRef) e).getIndex();
+      String name = input.getRowType().getFieldList().get(nameIndex).getName();
+      // Druid can't filter on metrics
+      return !druidTable.isMetric(name);
     case LITERAL:
       return ((RexLiteral) e).getValue() != null;
     case AND:
@@ -217,32 +222,34 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
     case EQUALS:
     case NOT_EQUALS:
     case IN:
-      return areValidFilters(((RexCall) e).getOperands(), false);
+      return areValidFilters(((RexCall) e).getOperands(), false, input);
     case LESS_THAN:
     case LESS_THAN_OR_EQUAL:
     case GREATER_THAN:
     case GREATER_THAN_OR_EQUAL:
     case BETWEEN:
-      return areValidFilters(((RexCall) e).getOperands(), true);
+      return areValidFilters(((RexCall) e).getOperands(), true, input);
     case CAST:
       return isValidCast((RexCall) e, boundedComparator);
     case EXTRACT:
       return TimeExtractionFunction.isValidTimeExtract((RexCall) e);
+    case IS_TRUE:
+      return isValidFilter(((RexCall) e).getOperands().get(0), boundedComparator, input);
     default:
       return false;
     }
   }
 
-  private boolean areValidFilters(List<RexNode> es, boolean boundedComparator) {
+  private boolean areValidFilters(List<RexNode> es, boolean boundedComparator, RelNode
input) {
     for (RexNode e : es) {
-      if (!isValidFilter(e, boundedComparator)) {
+      if (!isValidFilter(e, boundedComparator, input)) {
         return false;
       }
     }
     return true;
   }
 
-  private boolean isValidCast(RexCall e, boolean boundedComparator) {
+  private static boolean isValidCast(RexCall e, boolean boundedComparator) {
     assert e.isA(SqlKind.CAST);
     if (e.getOperands().get(0).isA(INPUT_REF)
         && e.getType().getFamily() == SqlTypeFamily.CHARACTER) {
@@ -624,7 +631,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
 
       for (Pair<AggregateCall, String> agg : Pair.zip(aggCalls, aggNames)) {
         final JsonAggregation jsonAggregation =
-            getJsonAggregation(fieldNames, agg.right, agg.left);
+            getJsonAggregation(fieldNames, agg.right, agg.left, projects, translator);
         aggregations.add(jsonAggregation);
         builder.add(jsonAggregation.name);
       }
@@ -782,7 +789,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
   }
 
   protected JsonAggregation getJsonAggregation(List<String> fieldNames,
-      String name, AggregateCall aggCall) {
+      String name, AggregateCall aggCall, List<RexNode> projects, Translator translator)
{
     final List<String> list = new ArrayList<>();
     for (Integer arg : aggCall.getArgList()) {
       list.add(fieldNames.get(arg));
@@ -807,12 +814,16 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
       // Cannot handle this aggregate function type
       throw new AssertionError("unknown aggregate type " + type);
     }
+
+    JsonAggregation aggregation;
+
     CalciteConnectionConfig config = getConnectionConfig();
     switch (aggCall.getAggregation().getKind()) {
     case COUNT:
       if (aggCall.isDistinct()) {
         if (config.approximateDistinctCount()) {
-          return new JsonCardinalityAggregation("cardinality", name, list);
+          aggregation = new JsonCardinalityAggregation("cardinality", name, list);
+          break;
         } else {
           // Gets thrown if one of the rules allows a count(distinct ...) through
           // when approximate results were not told be acceptable.
@@ -820,17 +831,30 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
               + " because an approximate count distinct is not acceptable.");
         }
       }
-      return new JsonAggregation("count", name, only);
+      aggregation = new JsonAggregation("count", name, only);
+      break;
     case SUM:
     case SUM0:
-      return new JsonAggregation(fractional ? "doubleSum" : "longSum", name, only);
+      aggregation = new JsonAggregation(fractional ? "doubleSum" : "longSum", name, only);
+      break;
     case MIN:
-      return new JsonAggregation(fractional ? "doubleMin" : "longMin", name, only);
+      aggregation = new JsonAggregation(fractional ? "doubleMin" : "longMin", name, only);
+      break;
     case MAX:
-      return new JsonAggregation(fractional ? "doubleMax" : "longMax", name, only);
+      aggregation = new JsonAggregation(fractional ? "doubleMax" : "longMax", name, only);
+      break;
     default:
       throw new AssertionError("unknown aggregate " + aggCall);
     }
+
+    // Check for filters
+    if (aggCall.hasFilter()) {
+      RexCall filterNode = (RexCall) projects.get(aggCall.filterArg);
+      JsonFilter filter = translator.translateFilter(filterNode.getOperands().get(0));
+      aggregation = new JsonFilteredAggregation(filter, aggregation);
+    }
+
+    return aggregation;
   }
 
   protected static void writeField(JsonGenerator generator, String fieldName,
@@ -977,6 +1001,9 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
         final RexCall call = (RexCall) e;
         assert DruidDateTimeUtils.extractGranularity(call) != null;
         index = RelOptUtil.InputFinder.bits(e).asList().get(0);
+        break;
+      case IS_TRUE:
+        return ""; // the fieldName for which this is the filter will be added separately
       }
       if (index == -1) {
         throw new AssertionError("invalid expression " + e);
@@ -1243,6 +1270,30 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
     }
   }
 
+  /** Aggregation element that contains a filter */
+  private static class JsonFilteredAggregation extends JsonAggregation {
+    final JsonFilter filter;
+    final JsonAggregation aggregation;
+
+    private JsonFilteredAggregation(JsonFilter filter, JsonAggregation aggregation) {
+      // Filtered aggregations don't use the "name" and "fieldName" fields directly,
+      // but rather use the ones defined in their "aggregation" field.
+      super("filtered", aggregation.name, aggregation.fieldName);
+      this.filter = filter;
+      this.aggregation = aggregation;
+      // The aggregation cannot be a JsonFilteredAggregation
+      assert !(aggregation instanceof JsonFilteredAggregation);
+    }
+
+    @Override public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type);
+      writeField(generator, "filter", filter);
+      writeField(generator, "aggregator", aggregation);
+      generator.writeEndObject();
+    }
+  }
+
   /** Filter element of a Druid "groupBy" or "topN" query. */
   private abstract static class JsonFilter implements Json {
     /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/551b5622/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index de65a3a..d932f7b 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.rules.AggregateFilterTransposeRule;
 import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
 import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
@@ -49,6 +50,7 @@ import org.apache.calcite.rex.RexSimplify;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.runtime.PredicateImpl;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.RelBuilder;
@@ -67,7 +69,10 @@ import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Rules and relational operators for {@link DruidQuery}.
@@ -444,31 +449,218 @@ public class DruidRules {
     public void onMatch(RelOptRuleCall call) {
       final Aggregate aggregate = call.rel(0);
       final Project project = call.rel(1);
-      final DruidQuery query = call.rel(2);
+      DruidQuery query = call.rel(2);
       if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) {
         return;
       }
-      int timestampIdx;
-      if ((timestampIdx = validProject(project, query)) == -1) {
+
+      int timestampIdx = validProject(project, query);
+      List<Integer> filterRefs = getFilterRefs(aggregate.getAggCallList());
+
+      if (timestampIdx == -1 && filterRefs.size() == 0) {
         return;
       }
+
+      // Check that the filters that the Aggregate calls refer to are valid filters can be
pushed
+      // into Druid
+      for (Integer i : filterRefs) {
+        RexNode filterNode = project.getProjects().get(i);
+        if (!query.isValidFilter(filterNode, project.getInput()) || filterNode.isAlwaysFalse())
{
+          return;
+        }
+      }
+
       if (aggregate.indicator
               || aggregate.getGroupSets().size() != 1
               || BAD_AGG.apply(ImmutableTriple.of(aggregate, (RelNode) project, query))
-              || !validAggregate(aggregate, timestampIdx)) {
+              || !validAggregate(aggregate, timestampIdx, filterRefs.size())) {
         return;
       }
-
       if (checkAggregateOnMetric(aggregate.getGroupSet(), project, query)) {
         return;
       }
 
       final RelNode newProject = project.copy(project.getTraitSet(),
               ImmutableList.of(Util.last(query.rels)));
-      final DruidQuery projectDruidQuery = DruidQuery.extendQuery(query, newProject);
       final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
-              ImmutableList.of(Util.last(projectDruidQuery.rels)));
-      call.transformTo(DruidQuery.extendQuery(projectDruidQuery, newAggregate));
+              ImmutableList.of(newProject));
+
+      if (filterRefs.size() > 0) {
+        query = optimizeFilteredAggregations(query, (Project) newProject,
+                (Aggregate) newAggregate);
+      } else {
+        query = DruidQuery.extendQuery(DruidQuery.extendQuery(query, newProject), newAggregate);
+      }
+      call.transformTo(query);
+    }
+
+    /**
+     * Returns an array of unique filter references from
+     * the given list of {@link org.apache.calcite.rel.core.AggregateCall}
+     * */
+    private Set<Integer> getUniqueFilterRefs(List<AggregateCall> calls) {
+      Set<Integer> refs = new HashSet<>();
+      for (AggregateCall call : calls) {
+        if (call.hasFilter()) {
+          refs.add(call.filterArg);
+        }
+      }
+      return refs;
+    }
+
+    /**
+     * Attempts to optimize any aggregations with filters in the DruidQuery by
+     * 1. Trying to abstract common filters out into the "filter" field
+     * 2. Eliminating expressions that are always true or always false when possible
+     * 3. ANDing aggregate filters together with the outer filter to allow for pruning of
data
+     * Should be called before pushing both the aggregate and project into Druid.
+     * Assumes that at least one aggregate call has a filter attached to it
+     * */
+    private DruidQuery optimizeFilteredAggregations(DruidQuery query, Project project,
+                                                   Aggregate aggregate) {
+      Filter filter = null;
+      RexBuilder builder = query.getCluster().getRexBuilder();
+      final RexExecutor executor =
+              Util.first(query.getCluster().getPlanner().getExecutor(), RexUtil.EXECUTOR);
+      RexSimplify simplifier = new RexSimplify(builder, true, executor);
+
+      // if the druid query originally contained a filter
+      boolean containsFilter = false;
+      for (RelNode node : query.rels) {
+        if (node instanceof Filter) {
+          filter = (Filter) node;
+          containsFilter = true;
+          break;
+        }
+      }
+
+      // if every aggregate call has a filter arg reference
+      boolean allHaveFilters = allAggregatesHaveFilters(aggregate.getAggCallList());
+
+      Set<Integer> uniqueFilterRefs = getUniqueFilterRefs(aggregate.getAggCallList());
+
+      // One of the pre-conditions for this method
+      assert uniqueFilterRefs.size() > 0;
+
+      List<AggregateCall> newCalls = new ArrayList<>();
+
+      // OR all the filters so that they can ANDed to the outer filter
+      List<RexNode> disjunctions = new ArrayList<>();
+      for (Integer i : uniqueFilterRefs) {
+        disjunctions.add(stripFilter(project.getProjects().get(i)));
+      }
+      RexNode filterNode = RexUtil.composeDisjunction(builder, disjunctions);
+
+      // Erase references to filters
+      for (AggregateCall call : aggregate.getAggCallList()) {
+        int newFilterArg = call.filterArg;
+        if (!call.hasFilter()
+                || (uniqueFilterRefs.size() == 1 && allHaveFilters) // filters get
extracted
+                || project.getProjects().get(newFilterArg).isAlwaysTrue()) {
+          newFilterArg = -1;
+        }
+        newCalls.add(call.copy(call.getArgList(), newFilterArg));
+      }
+      aggregate = aggregate.copy(aggregate.getTraitSet(), aggregate.getInput(),
+              aggregate.indicator, aggregate.getGroupSet(), aggregate.getGroupSets(),
+              newCalls);
+
+      if (containsFilter) {
+        // AND the current filterNode with the filter node inside filter
+        filterNode = builder.makeCall(SqlStdOperatorTable.AND, filterNode, filter.getCondition());
+      }
+
+      // Simplify the filter as much as possible
+      RexNode tempFilterNode = filterNode;
+      filterNode = simplifier.simplify(filterNode);
+
+      // It's possible that after simplification that the expression is now always false.
+      // Druid cannot handle such a filter.
+      // This will happen when the below expression (f_n+1 may not exist):
+      // f_n+1 AND (f_1 OR f_2 OR ... OR f_n) simplifies to be something always false.
+      // f_n+1 cannot be false, since it came from a pushed filter rel node
+      // and each f_i cannot be false, since DruidAggregateProjectRule would have caught
that.
+      // So, the only solution is to revert back to the un simplified version and let Druid
+      // handle a filter that is ultimately unsatisfiable.
+      if (filterNode.isAlwaysFalse()) {
+        filterNode = tempFilterNode;
+      }
+
+      filter = LogicalFilter.create(query.rels.get(0), filterNode);
+
+      boolean addNewFilter = !filter.getCondition().isAlwaysTrue() && allHaveFilters;
+      // Assumes that Filter nodes are always right after
+      // TableScan nodes (which are always present)
+      int startIndex = containsFilter && addNewFilter ? 2 : 1;
+
+      List<RelNode> newNodes = constructNewNodes(query.rels, addNewFilter, startIndex,
+              filter, project, aggregate);
+
+      return DruidQuery.create(query.getCluster(),
+             aggregate.getTraitSet().replace(query.getConvention()),
+             query.getTable(), query.druidTable, newNodes);
+    }
+
+    // Returns true if and only if every AggregateCall in calls has a filter argument.
+    private static boolean allAggregatesHaveFilters(List<AggregateCall> calls) {
+      for (AggregateCall call : calls) {
+        if (!call.hasFilter()) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Returns a new List of RelNodes in the order of the given order of the oldNodes,
+     * the given {@link Filter}, and any extra nodes.
+     */
+    private static List<RelNode> constructNewNodes(List<RelNode> oldNodes,
+        boolean addFilter, int startIndex, RelNode filter, RelNode ... trailingNodes) {
+      List<RelNode> newNodes = new ArrayList<>();
+
+      // The first item should always be the Table scan, so any filter would go after that
+      newNodes.add(oldNodes.get(0));
+
+      if (addFilter) {
+        newNodes.add(filter);
+        // This is required so that each RelNode is linked to the one before it
+        if (startIndex < oldNodes.size()) {
+          RelNode next = oldNodes.get(startIndex);
+          newNodes.add(next.copy(next.getTraitSet(), Collections.singletonList(filter)));
+          startIndex++;
+        }
+      }
+
+      // Add the rest of the nodes from oldNodes
+      for (int i = startIndex; i < oldNodes.size(); i++) {
+        newNodes.add(oldNodes.get(i));
+      }
+
+      // Add the trailing nodes (need to link them)
+      for (RelNode node : trailingNodes) {
+        newNodes.add(node.copy(node.getTraitSet(), Collections.singletonList(Util.last(newNodes))));
+      }
+
+      return newNodes;
+    }
+
+    // Removes the IS_TRUE in front of RexCalls, if they exist
+    private static RexNode stripFilter(RexNode node) {
+      if (node.getKind() == SqlKind.IS_TRUE) {
+        return ((RexCall) node).getOperands().get(0);
+      }
+      return node;
+    }
+
+    private static List<Integer> getFilterRefs(List<AggregateCall> calls) {
+      List<Integer> refs = new ArrayList<>();
+      for (AggregateCall call : calls) {
+        if (call.hasFilter()) {
+          refs.add(call.filterArg);
+        }
+      }
+      return refs;
     }
 
     /* To be a valid Project, we allow it to contain references, and a single call
@@ -529,7 +721,10 @@ public class DruidRules {
       return idxTimestamp;
     }
 
-    private static boolean validAggregate(Aggregate aggregate, int idx) {
+    private static boolean validAggregate(Aggregate aggregate, int idx, int numFilterRefs)
{
+      if (numFilterRefs > 0 && idx < 0) {
+        return true;
+      }
       if (!aggregate.getGroupSet().get(idx)) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/calcite/blob/551b5622/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index e04aba1..9c88e3a 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -2156,6 +2156,376 @@ public class DruidAdapterIT {
   }
 
   /**
+   * Tests whether an aggregate with a filter clause has it's filter factored out
+   * when there is no outer filter
+   */
+  @Test public void testFilterClauseFactoredOut() {
+    // Logically equivalent to
+    // select sum("store_sales") from "foodmart" where "the_year" >= 1997
+    String sql = "select sum(\"store_sales\") "
+            + "filter (where \"the_year\" >= 1997) from \"foodmart\"";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','filter':{'type':'bound','dimension':'the_year','lower':'1997',"
+            + "'lowerStrict':false,'ordering':'numeric'},'aggregations':[{'type':'doubleSum','name'"
+            + ":'EXPR$0','fieldName':'store_sales'}],'intervals':['1900-01-09T00:00:00.000/2992-01"
+            + "-10T00:00:00.000'],'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql).queryContains(druidChecker(expectedQuery));
+  }
+
+  /**
+   * Tests whether filter clauses with filters that are always true disappear or not
+   */
+  @Test public void testFilterClauseAlwaysTrueGone() {
+    // Logically equivalent to
+    // select sum("store_sales") from "foodmart"
+    String sql = "select sum(\"store_sales\") filter (where 1 = 1) from \"foodmart\"";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','aggregations':[{'type':'doubleSum','name':'EXPR$0','fieldName':"
+            + "'store_sales'}],'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql).queryContains(druidChecker(expectedQuery));
+  }
+
+  /**
+   * Tests whether filter clauses with filters that are always true disappear in the presence
+   * of another aggregate without a filter clause
+   */
+  @Test public void testFilterClauseAlwaysTrueWithAggGone1() {
+    // Logically equivalent to
+    // select sum("store_sales"), sum("store_cost") from "foodmart"
+    String sql = "select sum(\"store_sales\") filter (where 1 = 1), "
+            + "sum(\"store_cost\") from \"foodmart\"";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','aggregations':[{'type':'doubleSum','name':'EXPR$0','fieldName':"
+            + "'store_sales'},{'type':'doubleSum','name':'EXPR$1','fieldName':'store_cost'}],"
+            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql).queryContains(druidChecker(expectedQuery));
+  }
+
+  /**
+   * Tests whether filter clauses with filters that are always true disappear in the presence
+   * of another aggregate with a filter clause
+   */
+  @Test public void testFilterClauseAlwaysTrueWithAggGone2() {
+    // Logically equivalent to
+    // select sum("store_sales"),
+    // sum("store_cost") filter (where "store_state" = 'CA') from "foodmart"
+    String sql = "select sum(\"store_sales\") filter (where 1 = 1), "
+            + "sum(\"store_cost\") filter (where \"store_state\" = 'CA') "
+            + "from \"foodmart\"";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','aggregations':[{'type':'doubleSum','name':'EXPR$0','fieldName'"
+            + ":'store_sales'},{'type':'filtered','filter':{'type':'selector','dimension':"
+            + "'store_state','value':'CA'},'aggregator':{'type':'doubleSum','name':'EXPR$1',"
+            + "'fieldName':'store_cost'}}],'intervals':"
+            + "['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql).queryContains(druidChecker(expectedQuery));
+  }
+
+  /**
+   * Tests whether an existing outer filter is untouched when an aggregate has a filter clause
+   * that is always true
+   */
+  @Test public void testOuterFilterRemainsWithAlwaysTrueClause() {
+    // Logically equivalent to
+    // select sum("store_sales"), sum("store_cost") from "foodmart" where "store_city" =
'Seattle'
+    String sql = "select sum(\"store_sales\") filter (where 1 = 1), sum(\"store_cost\") "
+            + "from \"foodmart\" where \"store_city\" = 'Seattle'";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','filter':{'type':'selector','dimension':'store_city',"
+            + "'value':'Seattle'},'aggregations':[{'type':'doubleSum','name':'EXPR$0',"
+            + "'fieldName':'store_sales'},{'type':'doubleSum','name':'EXPR$1',"
+            + "'fieldName':'store_cost'}],'intervals':"
+            + "['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql).queryContains(druidChecker(expectedQuery));
+  }
+
+  /**
+   * Tests that an aggregate with a filter clause that is always false does not get pushed
in
+   */
+  @Test public void testFilterClauseAlwaysFalseNotPushed() {
+    String sql = "select sum(\"store_sales\") filter (where 1 > 1) from \"foodmart\"";
+    // Calcite takes care of the unsatisfiable filter
+    String expectedSubExplain =
+            "  BindableAggregate(group=[{}], EXPR$0=[SUM($0) FILTER $1])\n"
+            + "    BindableProject(store_sales=[$0], $f1=[false])\n";
+    sql(sql).explainContains(expectedSubExplain);
+  }
+
+  /**
+   * Tests that an aggregate with a filter clause that is always false does not get pushed
when
+   * there is already an outer filter
+   */
+  @Test public void testFilterClauseAlwaysFalseNotPushedWithFilter() {
+    String sql = "select sum(\"store_sales\") filter (where 1 > 1) "
+            + "from \"foodmart\" where \"store_city\" = 'Seattle'";
+    String expectedSubExplain =
+            "  BindableAggregate(group=[{}], EXPR$0=[SUM($0) FILTER $1])\n"
+            + "    BindableProject(store_sales=[$0], $f1=[false])\n"
+            + "      DruidQuery(table=[[foodmart, foodmart]], "
+                    + "intervals=[[1900-01-09T00:00:00.000/2992-01-10T00:00:00.000]], "
+                    // Make sure the original filter is still there
+                    + "filter=[=($62, 'Seattle')], projects=[[$90]])";
+
+    sql(sql).explainContains(expectedSubExplain);
+
+  }
+
+  /**
+   * Tests that an aggregate with a filter clause that is the same as the outer filter has
no
+   * references to that filter, and that the original outer filter remains
+   */
+  @Test public void testFilterClauseSameAsOuterFilterGone() {
+    // Logically equivalent to
+    // select sum("store_sales") from "foodmart" where "store_city" = 'Seattle'
+    String sql = "select sum(\"store_sales\") filter (where \"store_city\" = 'Seattle') "
+            + "from \"foodmart\" where \"store_city\" = 'Seattle'";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','filter':{'type':'selector','dimension':'store_city','value':"
+            + "'Seattle'},'aggregations':[{'type':'doubleSum','name':'EXPR$0','fieldName':"
+            + "'store_sales'}],'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql)
+        .queryContains(druidChecker(expectedQuery))
+        .returnsUnordered("EXPR$0=52644.07004201412");
+  }
+
+  /**
+   * Test to ensure that an aggregate with a filter clause in the presence of another aggregate
+   * without a filter clause does not have it's filter factored out into the outer filter
+   */
+  @Test public void testFilterClauseNotFactoredOut1() {
+    String sql = "select sum(\"store_sales\") filter (where \"store_state\" = 'CA'), "
+            + "sum(\"store_cost\") from \"foodmart\"";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','aggregations':[{'type':'filtered','filter':{'type':'selector',"
+            + "'dimension':'store_state','value':'CA'},'aggregator':{'type':'doubleSum','name':"
+            + "'EXPR$0','fieldName':'store_sales'}},{'type':'doubleSum','name':'EXPR$1','fieldName'"
+            + ":'store_cost'}],'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql).queryContains(druidChecker(expectedQuery));
+  }
+
+  /**
+   * Test to ensure that an aggregate with a filter clause in the presence of another aggregate
+   * without a filter clause, and an outer filter does not have it's
+   * filter factored out into the outer filter
+   */
+  @Test public void testFilterClauseNotFactoredOut2() {
+    String sql = "select sum(\"store_sales\") filter (where \"store_state\" = 'CA'), "
+            + "sum(\"store_cost\") from \"foodmart\" where \"the_year\" >= 1997";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','filter':{'type':'bound','dimension':'the_year','lower':'1997',"
+            + "'lowerStrict':false,'ordering':'numeric'},'aggregations':[{'type':'filtered',"
+            + "'filter':{'type':'selector','dimension':'store_state','value':'CA'},'aggregator':{"
+            + "'type':'doubleSum','name':'EXPR$0','fieldName':'store_sales'}},{'type':'doubleSum',"
+            + "'name':'EXPR$1','fieldName':'store_cost'}],"
+            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql).queryContains(druidChecker(expectedQuery));
+  }
+
+  /**
+   * Test to ensure that multiple aggregates with filter clauses have their filters extracted
to
+   * the outer filter field for data pruning
+   */
+  @Test public void testFilterClausesFactoredForPruning1() {
+    String sql = "select "
+            + "sum(\"store_sales\") filter (where \"store_state\" = 'CA'), "
+            + "sum(\"store_sales\") filter (where \"store_state\" = 'WA') "
+            + "from \"foodmart\"";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','filter':{'type':'or','fields':[{'type':'selector','dimension':"
+            + "'store_state','value':'CA'},{'type':'selector','dimension':'store_state',"
+            + "'value':'WA'}]},'aggregations':[{'type':'filtered','filter':{'type':'selector',"
+            + "'dimension':'store_state','value':'CA'},'aggregator':{'type':'doubleSum','name':"
+            + "'EXPR$0','fieldName':'store_sales'}},{'type':'filtered','filter':{'type':'selector',"
+            + "'dimension':'store_state','value':'WA'},'aggregator':{'type':'doubleSum','name':"
+            + "'EXPR$1','fieldName':'store_sales'}}],'intervals':"
+            + "['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql)
+        .queryContains(druidChecker(expectedQuery))
+        .returnsUnordered("EXPR$0=159167.840144217; EXPR$1=263793.2202244997");
+  }
+
+  /**
+   * Test to ensure that multiple aggregates with filter clauses have their filters extracted
to
+   * the outer filter field for data pruning in the presence of an outer filter
+   */
+  @Test public void testFilterClausesFactoredForPruning2() {
+    String sql = "select "
+            + "sum(\"store_sales\") filter (where \"store_state\" = 'CA'), "
+            + "sum(\"store_sales\") filter (where \"store_state\" = 'WA') "
+            + "from \"foodmart\" where \"brand_name\" = 'Super'";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','filter':{'type':'and','fields':[{'type':'or','fields':[{'type':"
+            + "'selector','dimension':'store_state','value':'CA'},{'type':'selector','dimension':"
+            + "'store_state','value':'WA'}]},{'type':'selector','dimension':'brand_name','value':"
+            + "'Super'}]},'aggregations':[{'type':'filtered','filter':{'type':'selector',"
+            + "'dimension':'store_state','value':'CA'},'aggregator':{'type':'doubleSum','name':"
+            + "'EXPR$0','fieldName':'store_sales'}},{'type':'filtered','filter':{'type':'selector',"
+            + "'dimension':'store_state','value':'WA'},'aggregator':{'type':'doubleSum','name':"
+            + "'EXPR$1','fieldName':'store_sales'}}],'intervals':"
+            + "['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql)
+        .queryContains(druidChecker(expectedQuery))
+        .returnsUnordered("EXPR$0=2600.0099930763245; EXPR$1=4486.439979553223");
+  }
+
+  /**
+   * Test to ensure that multiple aggregates with the same filter clause have them factored
+   * out in the presence of an outer filter, and that they no longer refer to those filters
+   */
+  @Test public void testMultipleFiltersFactoredOutWithOuterFilter() {
+    // Logically Equivalent to
+    // select sum("store_sales"), sum("store_cost")
+    // from "foodmart" where "brand_name" = 'Super' and "store_state" = 'CA'
+    String sql = "select "
+            + "sum(\"store_sales\") filter (where \"store_state\" = 'CA'), "
+            + "sum(\"store_cost\") filter (where \"store_state\" = 'CA') "
+            + "from \"foodmart\" "
+            + "where \"brand_name\" = 'Super'";
+    // Aggregates should lose reference to any filter clause
+    String expectedAggregateExplain = "aggs=[[SUM($0), SUM($2)]]";
+    String expectedQuery = "{'queryType':'timeseries','dataSource':'foodmart','descending':false,"
+            + "'granularity':'all','filter':{'type':'and','fields':[{'type':'selector','dimension':"
+            + "'store_state','value':'CA'},{'type':'selector','dimension':'brand_name','value':"
+            + "'Super'}]},'aggregations':[{'type':'doubleSum','name':'EXPR$0','fieldName':"
+            + "'store_sales'},{'type':'doubleSum','name':'EXPR$1','fieldName':'store_cost'}],"
+            + "'intervals':['1900-01-09T00:00:00.000/2992-01-10T00:00:00.000'],"
+            + "'context':{'skipEmptyBuckets':true}}";
+
+    sql(sql)
+        .queryContains(druidChecker(expectedQuery))
+        .explainContains(expectedAggregateExplain)
+        .returnsUnordered("EXPR$0=2600.0099930763245; EXPR$1=1013.1619997620583");
+  }
+
+  /**
+   * Tests that when the resulting filter from factoring filter clauses out is always false,
+   * that they are still pushed to Druid to handle.
+   */
+  @Test public void testOuterFilterFalseAfterFactorSimplification() {
+    // Normally we would factor out "the_year" > 1997 into the outer filter to prune the
data
+    // before aggregation and simplify the expression, but in this case that would produce:
+    // "the_year" > 1997 AND "the_year" <= 1997 -> false (after simplification)
+    // Since Druid cannot handle a "false" filter, we revert back to the
+    // pre-simplified version. i.e the filter should be "the_year" > 1997 and "the_year"
<= 1997
+    // and let Druid handle an unsatisfiable expression
+    String sql = "select sum(\"store_sales\") filter (where \"the_year\" > 1997) "
+            + "from \"foodmart\" where \"the_year\" <= 1997";
+
+    String expectedFilter = "filter':{'type':'and','fields':[{'type':'bound','dimension':'the_year'"
+            + ",'lower':'1997','lowerStrict':true,'ordering':'numeric'},{'type':'bound',"
+            + "'dimension':'the_year','upper':'1997','upperStrict':false,'ordering':'numeric'}]}";
+
+    sql(sql)
+        .queryContains(druidChecker(expectedFilter))
+        .returnsUnordered("");
+  }
+
+  /**
+   * Test to ensure that aggregates with filter clauses that Druid cannot handle are not
pushed in
+   * as filtered aggregates.
+   */
+  @Test public void testFilterClauseNotPushable() {
+    // Currently the adapter does not support the LIKE operator
+    String sql = "select sum(\"store_sales\") "
+            + "filter (where \"the_year\" like '199_') from \"foodmart\"";
+    String expectedSubExplain =
+            "  BindableAggregate(group=[{}], EXPR$0=[SUM($0) FILTER $1])\n"
+            + "    BindableProject(store_sales=[$1], $f1=[IS TRUE(LIKE($0, '199_'))])";
+
+    sql(sql).explainContains(expectedSubExplain);
+  }
+
+  /**
+   * Test to ensure that aggregations with metrics as filters do not get pushed into Druid
+   */
+  @Test public void testFilterClauseWithMetricRef() {
+    String sql = "select sum(\"store_sales\") filter (where \"store_cost\" > 10) from
\"foodmart\"";
+    String expectedSubExplain =
+            "  BindableAggregate(group=[{}], EXPR$0=[SUM($0) FILTER $1])\n"
+                    + "    BindableProject(store_sales=[$0], $f1=[IS TRUE(>($1, 10))])\n";
+
+    sql(sql).explainContains(expectedSubExplain);
+  }
+
+  /**
+   * Test to ensure that an aggregate with a nested filter clause has it's filter factored
out
+   */
+  @Test public void testNestedFilterClauseFactored() {
+    // Logically equivalent to
+    // select sum("store_sales") from "foodmart" where "store_state" in ('CA', 'OR')
+    String sql =
+            "select sum(\"store_sales\") "
+            + "filter (where \"store_state\" = 'CA' or \"store_state\" = 'OR') from \"foodmart\"";
+
+    String expectedFilterJson =
+            "filter':{'type':'or','fields':[{'type':'selector','dimension':"
+            + "'store_state','value':'CA'},{'type':'selector',"
+            + "'dimension':'store_state','value':'OR'}]}";
+
+    String expectedAggregateJson =
+            "'aggregations':[{'type':'doubleSum',"
+            + "'name':'EXPR$0','fieldName':'store_sales'}]";
+
+    sql(sql)
+            .queryContains(druidChecker(expectedFilterJson))
+            .queryContains(druidChecker(expectedAggregateJson))
+            .returnsUnordered("EXPR$0=301444.910279572");
+  }
+
+  /**
+   * Test to ensure that aggregates with nested filters have their filters factored out
+   * into the outer filter for data pruning while still holding a reference to the filter
clause
+   */
+  @Test public void testNestedFilterClauseInAggregates() {
+    String sql =
+            "select "
+            + "sum(\"store_sales\") filter "
+                    + "(where \"store_state\" = 'CA' and \"the_month\" = 'October'), "
+            + "sum(\"store_cost\") filter "
+                    + "(where \"store_state\" = 'CA' and \"the_day\" = 'Monday') "
+            + "from \"foodmart\"";
+
+    // (store_state = CA AND the_month = October) OR (store_state = CA AND the_day = Monday)
+    String expectedFilterJson = "filter':{'type':'or','fields':[{'type':'and','fields':[{'type':"
+            + "'selector','dimension':'store_state','value':'CA'},{'type':'selector','dimension':"
+            + "'the_month','value':'October'}]},{'type':'and','fields':[{'type':'selector',"
+            + "'dimension':'store_state','value':'CA'},{'type':'selector','dimension':'the_day',"
+            + "'value':'Monday'}]}]}";
+
+    String expectedAggregatesJson = "'aggregations':[{'type':'filtered','filter':{'type':'and',"
+            + "'fields':[{'type':'selector','dimension':'store_state','value':'CA'},{'type':"
+            + "'selector','dimension':'the_month','value':'October'}]},'aggregator':{'type':"
+            + "'doubleSum','name':'EXPR$0','fieldName':'store_sales'}},{'type':'filtered',"
+            + "'filter':{'type':'and','fields':[{'type':'selector','dimension':'store_state',"
+            + "'value':'CA'},{'type':'selector','dimension':'the_day','value':'Monday'}]},"
+            + "'aggregator':{'type':'doubleSum','name':'EXPR$1','fieldName':'store_cost'}}]";
+
+    sql(sql)
+            .queryContains(druidChecker(expectedFilterJson))
+            .queryContains(druidChecker(expectedAggregatesJson))
+            .returnsUnordered("EXPR$0=13077.79001301527; EXPR$1=9830.779905691743");
+  }
+
+  /**
    * <a href="https://issues.apache.org/jira/browse/CALCITE-1805">[CALCITE-1805]
    * Druid adapter cannot handle count column without adding support for nested queries</a>.
    */


Mime
View raw message