hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [07/31] hive git commit: HIVE-14217: Druid integration (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Date Mon, 12 Sep 2016 20:24:39 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
new file mode 100644
index 0000000..43982aa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQuery.java
@@ -0,0 +1,1053 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+/**
+ * Relational expression representing a scan of a Druid data set.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidQuery extends TableScan {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidQuery.class);
+
+  protected QuerySpec querySpec;
+
+  final DruidTable druidTable;
+  final List<Interval> intervals;
+  final ImmutableList<RelNode> rels;
+
+  private static final Pattern VALID_SIG = Pattern.compile("sf?p?a?l?");
+
+  /**
+   * Creates a DruidQuery.
+   *
+   * @param cluster        Cluster
+   * @param traitSet       Traits
+   * @param table          Table
+   * @param druidTable     Druid table
+   * @param interval       Interval for the query
+   * @param rels           Internal relational expressions
+   */
+  private DruidQuery(RelOptCluster cluster, RelTraitSet traitSet,
+      RelOptTable table, DruidTable druidTable,
+      List<Interval> intervals, List<RelNode> rels) {
+    super(cluster, traitSet, table);
+    this.druidTable = druidTable;
+    this.intervals = ImmutableList.copyOf(intervals);
+    this.rels = ImmutableList.copyOf(rels);
+
+    assert isValid(Litmus.THROW);
+  }
+
+  /** Returns a string describing the operations inside this query.
+   *
+   * <p>For example, "sfpal" means {@link TableScan} (s)
+   * followed by {@link Filter} (f)
+   * followed by {@link Project} (p)
+   * followed by {@link Aggregate} (a)
+   * followed by {@link Sort} (l).
+   *
+   * @see #isValidSignature(String)
+   */
+  String signature() {
+    final StringBuilder b = new StringBuilder();
+    for (RelNode rel : rels) {
+      b.append(rel instanceof TableScan ? 's'
+          : rel instanceof Project ? 'p'
+          : rel instanceof Filter ? 'f'
+          : rel instanceof Aggregate ? 'a'
+          : rel instanceof Sort ? 'l'
+          : '!');
+    }
+    return b.toString();
+  }
+
+  @Override public boolean isValid(Litmus litmus) {
+    if (!super.isValid(litmus)) {
+      return false;
+    }
+    final String signature = signature();
+    if (!isValidSignature(signature)) {
+      return litmus.fail("invalid signature");
+    }
+    if (rels.isEmpty()) {
+      return litmus.fail("must have at least one rel");
+    }
+    for (int i = 0; i < rels.size(); i++) {
+      final RelNode r = rels.get(i);
+      if (i == 0) {
+        if (!(r instanceof TableScan)) {
+          return litmus.fail("first rel must be TableScan");
+        }
+        if (r.getTable() != table) {
+          return litmus.fail("first rel must be based on table table");
+        }
+      } else {
+        final List<RelNode> inputs = r.getInputs();
+        if (inputs.size() != 1 || inputs.get(0) != rels.get(i - 1)) {
+          return litmus.fail("each rel must have a single input");
+        }
+        if (r instanceof Aggregate) {
+          final Aggregate aggregate = (Aggregate) r;
+          if (aggregate.getGroupSets().size() != 1
+              || aggregate.indicator) {
+            return litmus.fail("no grouping sets");
+          }
+          for (AggregateCall call : aggregate.getAggCallList()) {
+            if (call.filterArg >= 0) {
+              return litmus.fail("no filtered aggregate functions");
+            }
+          }
+        }
+        if (r instanceof Filter) {
+          final Filter filter = (Filter) r;
+          if (!isValidFilter(filter.getCondition())) {
+            return litmus.fail("invalid filter");
+          }
+        }
+        if (r instanceof Sort) {
+          final Sort sort = (Sort) r;
+          if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
+            return litmus.fail("offset not supported");
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  boolean isValidFilter(RexNode e) {
+    switch (e.getKind()) {
+    case INPUT_REF:
+    case LITERAL:
+      return true;
+    case AND:
+    case OR:
+    case NOT:
+    case EQUALS:
+    case LESS_THAN:
+    case LESS_THAN_OR_EQUAL:
+    case GREATER_THAN:
+    case GREATER_THAN_OR_EQUAL:
+    case BETWEEN:
+    case IN:
+    case CAST:
+      return areValidFilters(((RexCall) e).getOperands());
+    default:
+      return false;
+    }
+  }
+
+  private boolean areValidFilters(List<RexNode> es) {
+    for (RexNode e : es) {
+      if (!isValidFilter(e)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /** Returns whether a signature represents an sequence of relational operators
+   * that can be translated into a valid Druid query. */
+  static boolean isValidSignature(String signature) {
+    return VALID_SIG.matcher(signature).matches();
+  }
+
+  /** Creates a DruidQuery. */
+  public static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet,
+      RelOptTable table, DruidTable druidTable, List<RelNode> rels) {
+    return new DruidQuery(cluster, traitSet, table, druidTable, druidTable.intervals, rels);
+  }
+
+  /** Creates a DruidQuery. */
+  private static DruidQuery create(RelOptCluster cluster, RelTraitSet traitSet,
+      RelOptTable table, DruidTable druidTable, List<Interval> intervals, List<RelNode> rels) {
+    return new DruidQuery(cluster, traitSet, table, druidTable, intervals, rels);
+  }
+
+  /** Extends a DruidQuery. */
+  public static DruidQuery extendQuery(DruidQuery query, RelNode r) {
+    final ImmutableList.Builder<RelNode> builder = ImmutableList.builder();
+    return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(),
+            query.druidTable, query.intervals, builder.addAll(query.rels).add(r).build());
+  }
+
+  /** Extends a DruidQuery. */
+  public static DruidQuery extendQuery(DruidQuery query, List<Interval> intervals) {
+    return DruidQuery.create(query.getCluster(), query.getTraitSet(), query.getTable(),
+            query.druidTable, intervals, query.rels);
+  }
+
+  @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    assert inputs.isEmpty();
+    return this;
+  }
+
+  @Override public RelDataType deriveRowType() {
+    return getCluster().getTypeFactory().createStructType(
+            Pair.right(Util.last(rels).getRowType().getFieldList()),
+            getQuerySpec().fieldNames);
+  }
+
+  public TableScan getTableScan() {
+    return (TableScan) rels.get(0);
+  }
+
+  public RelNode getTopNode() {
+    return Util.last(rels);
+  }
+
+  @Override public RelOptTable getTable() {
+    return table;
+  }
+
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    for (RelNode rel : rels) {
+      if (rel instanceof TableScan) {
+        TableScan tableScan = (TableScan) rel;
+        pw.item("table", tableScan.getTable().getQualifiedName());
+        pw.item("intervals", intervals);
+      } else if (rel instanceof Filter) {
+        pw.item("filter", ((Filter) rel).getCondition());
+      } else if (rel instanceof Project) {
+        pw.item("projects", ((Project) rel).getProjects());
+      } else if (rel instanceof Aggregate) {
+        final Aggregate aggregate = (Aggregate) rel;
+        pw.item("groups", aggregate.getGroupSet())
+            .item("aggs", aggregate.getAggCallList());
+      } else if (rel instanceof Sort) {
+        final Sort sort = (Sort) rel;
+        for (Ord<RelFieldCollation> ord
+                : Ord.zip(sort.collation.getFieldCollations())) {
+          pw.item("sort" + ord.i, ord.e.getFieldIndex());
+        }
+        for (Ord<RelFieldCollation> ord
+            : Ord.zip(sort.collation.getFieldCollations())) {
+          pw.item("dir" + ord.i, ord.e.shortString());
+        }
+        pw.itemIf("fetch", sort.fetch, sort.fetch != null);
+      } else {
+        throw new AssertionError("rel type not supported in Druid query "
+            + rel);
+      }
+    }
+    return pw;
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+      RelMetadataQuery mq) {
+    // Heuristic: we assume pushing query to Druid reduces cost by 90%
+    return Util.last(rels).computeSelfCost(planner, mq).multiplyBy(.1);
+  }
+
+  @Override public RelNode project(ImmutableBitSet fieldsUsed,
+      Set<RelDataTypeField> extraFields,
+      RelBuilder relBuilder) {
+    final int fieldCount = getRowType().getFieldCount();
+    if (fieldsUsed.equals(ImmutableBitSet.range(fieldCount))
+        && extraFields.isEmpty()) {
+      return this;
+    }
+    final List<RexNode> exprList = new ArrayList<>();
+    final List<String> nameList = new ArrayList<>();
+    final RexBuilder rexBuilder = getCluster().getRexBuilder();
+    final List<RelDataTypeField> fields = getRowType().getFieldList();
+
+    // Project the subset of fields.
+    for (int i : fieldsUsed) {
+      RelDataTypeField field = fields.get(i);
+      exprList.add(rexBuilder.makeInputRef(this, i));
+      nameList.add(field.getName());
+    }
+
+    // Project nulls for the extra fields. (Maybe a sub-class table has
+    // extra fields, but we don't.)
+    for (RelDataTypeField extraField : extraFields) {
+      exprList.add(
+          rexBuilder.ensureType(
+              extraField.getType(),
+              rexBuilder.constantNull(),
+              true));
+      nameList.add(extraField.getName());
+    }
+
+    HiveProject hp = (HiveProject) relBuilder.push(this).project(exprList, nameList).build();
+    hp.setSynthetic();
+    return hp;
+  }
+
+  public QuerySpec getQuerySpec() {
+    if (querySpec == null) {
+      querySpec = deriveQuerySpec();
+      assert querySpec != null : this;
+    }
+    return querySpec;
+  }
+
+  protected QuerySpec deriveQuerySpec() {
+    final RelDataType rowType = table.getRowType();
+    int i = 1;
+
+    RexNode filter = null;
+    if (i < rels.size() && rels.get(i) instanceof Filter) {
+      final Filter filterRel = (Filter) rels.get(i++);
+      filter = filterRel.getCondition();
+    }
+
+    List<RexNode> projects = null;
+    if (i < rels.size() && rels.get(i) instanceof Project) {
+      final Project project = (Project) rels.get(i++);
+      projects = project.getProjects();
+    }
+
+    ImmutableBitSet groupSet = null;
+    List<AggregateCall> aggCalls = null;
+    List<String> aggNames = null;
+    if (i < rels.size() && rels.get(i) instanceof Aggregate) {
+      final Aggregate aggregate = (Aggregate) rels.get(i++);
+      groupSet = aggregate.getGroupSet();
+      aggCalls = aggregate.getAggCallList();
+      aggNames = Util.skip(aggregate.getRowType().getFieldNames(),
+          groupSet.cardinality());
+    }
+
+    List<Integer> collationIndexes = null;
+    List<Direction> collationDirections = null;
+    Integer fetch = null;
+    if (i < rels.size() && rels.get(i) instanceof Sort) {
+      final Sort sort = (Sort) rels.get(i++);
+      collationIndexes = new ArrayList<>();
+      collationDirections = new ArrayList<>();
+      for (RelFieldCollation fCol: sort.collation.getFieldCollations()) {
+        collationIndexes.add(fCol.getFieldIndex());
+        collationDirections.add(fCol.getDirection());
+      }
+      fetch = sort.fetch != null ? RexLiteral.intValue(sort.fetch) : null;
+    }
+
+    if (i != rels.size()) {
+      throw new AssertionError("could not implement all rels");
+    }
+
+    return getQuery(rowType, filter, projects, groupSet, aggCalls, aggNames,
+            collationIndexes, collationDirections, fetch);
+  }
+
+  public String getQueryType() {
+    return getQuerySpec().queryType.getQueryName();
+  }
+
+  public String getQueryString() {
+    return getQuerySpec().queryString;
+  }
+
+  private QuerySpec getQuery(RelDataType rowType, RexNode filter, List<RexNode> projects,
+      ImmutableBitSet groupSet, List<AggregateCall> aggCalls, List<String> aggNames,
+      List<Integer> collationIndexes, List<Direction> collationDirections, Integer fetch) {
+    DruidQueryType queryType = DruidQueryType.SELECT;
+    final Translator translator = new Translator(druidTable, rowType);
+    List<String> fieldNames = rowType.getFieldNames();
+
+    // Handle filter
+    Json jsonFilter = null;
+    if (filter != null) {
+      jsonFilter = translator.translateFilter(filter);
+    }
+
+    // Then we handle project
+    if (projects != null) {
+      translator.metrics.clear();
+      translator.dimensions.clear();
+      final ImmutableList.Builder<String> builder = ImmutableList.builder();
+      for (RexNode project : projects) {
+        builder.add(translator.translate(project, true));
+      }
+      fieldNames = builder.build();
+    }
+
+    // Finally we handle aggregate and sort. Handling of these
+    // operators is more complex, since we need to extract
+    // the conditions to know whether the query will be
+    // executed as a Timeseries, TopN, or GroupBy in Druid
+    final List<String> dimensions = new ArrayList<>();
+    final List<JsonAggregation> aggregations = new ArrayList<>();
+    String granularity = "ALL";
+    Direction timeSeriesDirection = null;
+    JsonLimit limit = null;
+    if (groupSet != null) {
+      assert aggCalls != null;
+      assert aggNames != null;
+      assert aggCalls.size() == aggNames.size();
+
+      int timePositionIdx = -1;
+      final ImmutableList.Builder<String> builder = ImmutableList.builder();
+      if (projects != null) {
+        for (int groupKey : groupSet) {
+          final String s = fieldNames.get(groupKey);
+          final RexNode project = projects.get(groupKey);
+          if (project instanceof RexInputRef) {
+            // Reference, it could be to the timestamp column or any other dimension
+            final RexInputRef ref = (RexInputRef) project;
+            final String origin = druidTable.rowType.getFieldList().get(ref.getIndex()).getName();
+            if (origin.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+              granularity = "NONE";
+              builder.add(s);
+              assert timePositionIdx == -1;
+              timePositionIdx = groupKey;
+            } else {
+              dimensions.add(s);
+              builder.add(s);
+            }
+          } else if (project instanceof RexCall) {
+            // Call, check if we should infer granularity
+            RexCall call = (RexCall) project;
+            if (HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) {
+              granularity = call.getOperator().getName();
+              builder.add(s);
+              assert timePositionIdx == -1;
+              timePositionIdx = groupKey;
+            } else {
+              dimensions.add(s);
+              builder.add(s);
+            }
+          } else {
+            throw new AssertionError("incompatible project expression: " + project);
+          }
+        }
+      } else {
+        for (int groupKey : groupSet) {
+          final String s = fieldNames.get(groupKey);
+          if (s.equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+            granularity = "NONE";
+            builder.add(s);
+            assert timePositionIdx == -1;
+            timePositionIdx = groupKey;
+          } else {
+            dimensions.add(s);
+            builder.add(s);
+          }
+        }
+      }
+
+      for (Pair<AggregateCall, String> agg : Pair.zip(aggCalls, aggNames)) {
+        final JsonAggregation jsonAggregation =
+            getJsonAggregation(fieldNames, agg.right, agg.left);
+        aggregations.add(jsonAggregation);
+        builder.add(jsonAggregation.name);
+      }
+
+      fieldNames = builder.build();
+
+      ImmutableList<JsonCollation> collations = null;
+      boolean sortsMetric = false;
+      if (collationIndexes != null) {
+        assert collationDirections != null;
+        ImmutableList.Builder<JsonCollation> colBuilder = new ImmutableList.Builder<JsonCollation>();
+        for (Pair<Integer,Direction> p : Pair.zip(collationIndexes, collationDirections)) {
+          colBuilder.add(new JsonCollation(fieldNames.get(p.left),
+                  p.right == Direction.DESCENDING ? "descending" : "ascending"));
+          if (p.left >= groupSet.cardinality() && p.right == Direction.DESCENDING) {
+            // Currently only support for DESC in TopN
+            sortsMetric = true;
+          } else if (p.left == timePositionIdx) {
+            assert timeSeriesDirection == null;
+            timeSeriesDirection = p.right;
+          }
+        }
+        collations = colBuilder.build();
+      }
+
+      limit = new JsonLimit("default", fetch, collations);
+
+      if (dimensions.isEmpty() && (collations == null || timeSeriesDirection != null)) {
+        queryType = DruidQueryType.TIMESERIES;
+        assert fetch == null;
+      } else if (dimensions.size() == 1 && sortsMetric && collations.size() == 1 && fetch != null) {
+        queryType = DruidQueryType.TOP_N;
+      } else {
+        queryType = DruidQueryType.GROUP_BY;
+      }
+    } else {
+      assert aggCalls == null;
+      assert aggNames == null;
+      assert collationIndexes == null || collationIndexes.isEmpty();
+      assert collationDirections == null || collationDirections.isEmpty();
+    }
+
+    final StringWriter sw = new StringWriter();
+    final JsonFactory factory = new JsonFactory();
+    try {
+      final JsonGenerator generator = factory.createGenerator(sw);
+
+      switch (queryType) {
+      case TIMESERIES:
+        generator.writeStartObject();
+
+        generator.writeStringField("queryType", "timeseries");
+        generator.writeStringField("dataSource", druidTable.dataSource);
+        generator.writeStringField("descending", timeSeriesDirection != null &&
+            timeSeriesDirection == Direction.DESCENDING ? "true" : "false");
+        generator.writeStringField("granularity", granularity);
+        writeFieldIf(generator, "filter", jsonFilter);
+        writeField(generator, "aggregations", aggregations);
+        writeFieldIf(generator, "postAggregations", null);
+        writeField(generator, "intervals", intervals);
+
+        generator.writeEndObject();
+        break;
+
+      case TOP_N:
+        generator.writeStartObject();
+
+        generator.writeStringField("queryType", "topN");
+        generator.writeStringField("dataSource", druidTable.dataSource);
+        generator.writeStringField("granularity", granularity);
+        generator.writeStringField("dimension", dimensions.get(0));
+        generator.writeStringField("metric", fieldNames.get(collationIndexes.get(0)));
+        writeFieldIf(generator, "filter", jsonFilter);
+        writeField(generator, "aggregations", aggregations);
+        writeFieldIf(generator, "postAggregations", null);
+        writeField(generator, "intervals", intervals);
+        generator.writeNumberField("threshold", fetch);
+
+        generator.writeEndObject();
+        break;
+
+      case GROUP_BY:
+        generator.writeStartObject();
+
+        if (aggregations.isEmpty()) {
+          // Druid requires at least one aggregation, otherwise gives:
+          //   Must have at least one AggregatorFactory
+          aggregations.add(
+              new JsonAggregation("longSum", "dummy_agg", "dummy_agg"));
+        }
+
+        generator.writeStringField("queryType", "groupBy");
+        generator.writeStringField("dataSource", druidTable.dataSource);
+        generator.writeStringField("granularity", granularity);
+        writeField(generator, "dimensions", dimensions);
+        writeFieldIf(generator, "limitSpec", limit);
+        writeFieldIf(generator, "filter", jsonFilter);
+        writeField(generator, "aggregations", aggregations);
+        writeFieldIf(generator, "postAggregations", null);
+        writeField(generator, "intervals", intervals);
+        writeFieldIf(generator, "having", null);
+
+        generator.writeEndObject();
+        break;
+
+      case SELECT:
+        generator.writeStartObject();
+
+        generator.writeStringField("queryType", "select");
+        generator.writeStringField("dataSource", druidTable.dataSource);
+        generator.writeStringField("descending", "false");
+        writeField(generator, "intervals", intervals);
+        writeFieldIf(generator, "filter", jsonFilter);
+        writeField(generator, "dimensions", translator.dimensions);
+        writeField(generator, "metrics", translator.metrics);
+        generator.writeStringField("granularity", granularity);
+
+        generator.writeFieldName("pagingSpec");
+        generator.writeStartObject();
+        generator.writeNumberField("threshold", fetch != null ? fetch : 1);
+        generator.writeEndObject();
+
+        generator.writeFieldName("context");
+        generator.writeStartObject();
+        generator.writeBooleanField(Constants.DRUID_QUERY_FETCH, fetch != null);
+        generator.writeEndObject();
+
+        generator.writeEndObject();
+        break;
+
+      default:
+        throw new AssertionError("unknown query type " + queryType);
+      }
+
+      generator.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    return new QuerySpec(queryType, sw.toString(), fieldNames);
+  }
+
+  private JsonAggregation getJsonAggregation(List<String> fieldNames,
+      String name, AggregateCall aggCall) {
+    final List<String> list = new ArrayList<>();
+    for (Integer arg : aggCall.getArgList()) {
+      list.add(fieldNames.get(arg));
+    }
+    final String only = Iterables.getFirst(list, null);
+    final boolean b = aggCall.getType().getSqlTypeName() == SqlTypeName.DOUBLE;
+    switch (aggCall.getAggregation().getKind()) {
+    case COUNT:
+      if (aggCall.isDistinct()) {
+        return new JsonCardinalityAggregation("cardinality", name, list);
+      }
+      return new JsonAggregation("count", name, only);
+    case SUM:
+    case SUM0:
+      return new JsonAggregation(b ? "doubleSum" : "longSum", name, only);
+    case MIN:
+      return new JsonAggregation(b ? "doubleMin" : "longMin", name, only);
+    case MAX:
+      return new JsonAggregation(b ? "doubleMax" : "longMax", name, only);
+    default:
+      throw new AssertionError("unknown aggregate " + aggCall);
+    }
+  }
+
+  private static void writeField(JsonGenerator generator, String fieldName,
+      Object o) throws IOException {
+    generator.writeFieldName(fieldName);
+    writeObject(generator, o);
+  }
+
+  private static void writeFieldIf(JsonGenerator generator, String fieldName,
+      Object o) throws IOException {
+    if (o != null) {
+      writeField(generator, fieldName, o);
+    }
+  }
+
+  private static void writeArray(JsonGenerator generator, List<?> elements)
+      throws IOException {
+    generator.writeStartArray();
+    for (Object o : elements) {
+      writeObject(generator, o);
+    }
+    generator.writeEndArray();
+  }
+
+  private static void writeObject(JsonGenerator generator, Object o)
+      throws IOException {
+    if (o instanceof String) {
+      String s = (String) o;
+      generator.writeString(s);
+    } else if (o instanceof Interval) {
+      Interval i = (Interval) o;
+      generator.writeString(i.toString());
+    } else if (o instanceof Integer) {
+      Integer i = (Integer) o;
+      generator.writeNumber(i);
+    } else if (o instanceof List) {
+      writeArray(generator, (List<?>) o);
+    } else if (o instanceof Json) {
+      ((Json) o).write(generator);
+    } else {
+      throw new AssertionError("not a json object: " + o);
+    }
+  }
+
+  /** Druid query specification. */
+  public static class QuerySpec {
+    final DruidQueryType queryType;
+    final String queryString;
+    final List<String> fieldNames;
+
+    QuerySpec(DruidQueryType queryType, String queryString,
+        List<String> fieldNames) {
+      this.queryType = Preconditions.checkNotNull(queryType);
+      this.queryString = Preconditions.checkNotNull(queryString);
+      this.fieldNames = ImmutableList.copyOf(fieldNames);
+    }
+
+    @Override public int hashCode() {
+      return Objects.hash(queryType, queryString, fieldNames);
+    }
+
+    @Override public boolean equals(Object obj) {
+      return obj == this
+          || obj instanceof QuerySpec
+          && queryType == ((QuerySpec) obj).queryType
+          && queryString.equals(((QuerySpec) obj).queryString)
+          && fieldNames.equals(((QuerySpec) obj).fieldNames);
+    }
+
+    @Override public String toString() {
+      return "{queryType: " + queryType
+          + ", queryString: " + queryString
+          + ", fieldNames: " + fieldNames + "}";
+    }
+
+    String getQueryString(String pagingIdentifier, int offset) {
+      if (pagingIdentifier == null) {
+        return queryString;
+      }
+      return queryString.replace("\"threshold\":",
+          "\"pagingIdentifiers\":{\"" + pagingIdentifier + "\":" + offset
+              + "},\"threshold\":");
+    }
+  }
+
+  /** Translates scalar expressions to Druid field references. */
+  private static class Translator {
+    final List<String> dimensions = new ArrayList<>();
+    final List<String> metrics = new ArrayList<>();
+    final DruidTable druidTable;
+    final RelDataType rowType;
+
+    Translator(DruidTable druidTable, RelDataType rowType) {
+      this.druidTable = druidTable;
+      this.rowType = rowType;
+      for (RelDataTypeField f : rowType.getFieldList()) {
+        final String fieldName = f.getName();
+        if (druidTable.metricFieldNames.contains(fieldName)) {
+          metrics.add(fieldName);
+        } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
+          dimensions.add(fieldName);
+        }
+      }
+    }
+
+    String translate(RexNode e, boolean set) {
+      switch (e.getKind()) {
+      case INPUT_REF:
+        final RexInputRef ref = (RexInputRef) e;
+        final String fieldName =
+            rowType.getFieldList().get(ref.getIndex()).getName();
+        if (set) {
+          if (druidTable.metricFieldNames.contains(fieldName)) {
+            metrics.add(fieldName);
+          } else if (!DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(fieldName)) {
+            dimensions.add(fieldName);
+          }
+        }
+        return fieldName;
+
+      case CAST:
+       return tr(e, 0, set);
+
+      case LITERAL:
+        return ((RexLiteral) e).getValue2().toString();
+
+      case OTHER_FUNCTION:
+        final RexCall call = (RexCall) e;
+        assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator());
+        return tr(call, 0, set);
+
+      default:
+        throw new AssertionError("invalid expression " + e);
+      }
+    }
+
+    @SuppressWarnings("incomplete-switch")
+    private JsonFilter translateFilter(RexNode e) {
+      RexCall call;
+      switch (e.getKind()) {
+      case EQUALS:
+      case NOT_EQUALS:
+      case GREATER_THAN:
+      case GREATER_THAN_OR_EQUAL:
+      case LESS_THAN:
+      case LESS_THAN_OR_EQUAL:
+        call = (RexCall) e;
+        int posRef;
+        int posConstant;
+        if (RexUtil.isConstant(call.getOperands().get(1))) {
+          posRef = 0;
+          posConstant = 1;
+        } else if (RexUtil.isConstant(call.getOperands().get(0))) {
+          posRef = 1;
+          posConstant = 0;
+        } else {
+          throw new AssertionError("it is not a valid comparison: " + e);
+        }
+        switch (e.getKind()) {
+        case EQUALS:
+          return new JsonSelector("selector", tr(e, posRef), tr(e, posConstant));
+        case NOT_EQUALS:
+          return new JsonCompositeFilter("not",
+              ImmutableList.of(new JsonSelector("selector", tr(e, posRef), tr(e, posConstant))));
+        case GREATER_THAN:
+          return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), true, null, false,
+              false);
+        case GREATER_THAN_OR_EQUAL:
+          return new JsonBound("bound", tr(e, posRef), tr(e, posConstant), false, null, false,
+              false);
+        case LESS_THAN:
+          return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), true,
+              false);
+        case LESS_THAN_OR_EQUAL:
+          return new JsonBound("bound", tr(e, posRef), null, false, tr(e, posConstant), false,
+              false);
+        }
+      case AND:
+      case OR:
+      case NOT:
+        call = (RexCall) e;
+        return new JsonCompositeFilter(e.getKind().toString().toLowerCase(),
+            translateFilters(call.getOperands()));
+      default:
+        throw new AssertionError("cannot translate filter: " + e);
+      }
+    }
+
+    private String tr(RexNode call, int index) {
+      return tr(call, index, false);
+    }
+
+    private String tr(RexNode call, int index, boolean set) {
+      return translate(((RexCall) call).getOperands().get(index), set);
+    }
+
+    private List<JsonFilter> translateFilters(List<RexNode> operands) {
+      final ImmutableList.Builder<JsonFilter> builder =
+          ImmutableList.builder();
+      for (RexNode operand : operands) {
+        builder.add(translateFilter(operand));
+      }
+      return builder.build();
+    }
+  }
+
+  /** Object that knows how to write itself to a
+   * {@link com.fasterxml.jackson.core.JsonGenerator}. */
+  private interface Json {
+    void write(JsonGenerator generator) throws IOException;
+  }
+
+  /** Aggregation element of a Druid "groupBy" or "topN" query. */
+  private static class JsonAggregation implements Json {
+    final String type;
+    final String name;
+    final String fieldName;
+
+    private JsonAggregation(String type, String name, String fieldName) {
+      this.type = type;
+      this.name = name;
+      this.fieldName = fieldName;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type);
+      generator.writeStringField("name", name);
+      writeFieldIf(generator, "fieldName", fieldName);
+      generator.writeEndObject();
+    }
+  }
+
+  /** Collation element of a Druid "groupBy" query. */
+  private static class JsonLimit implements Json {
+    final String type;
+    final Integer limit;
+    final ImmutableList<JsonCollation> collations;
+
+    private JsonLimit(String type, Integer limit, ImmutableList<JsonCollation> collations) {
+      this.type = type;
+      this.limit = limit;
+      this.collations = collations;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type);
+      writeFieldIf(generator, "limit", limit);
+      writeFieldIf(generator, "columns", collations);
+      generator.writeEndObject();
+    }
+  }
+
+  /** Collation element of a Druid "groupBy" query. */
+  private static class JsonCollation implements Json {
+    final String dimension;
+    final String direction;
+
+    private JsonCollation(String dimension, String direction) {
+      this.dimension = dimension;
+      this.direction = direction;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("dimension", dimension);
+      writeFieldIf(generator, "direction", direction);
+      generator.writeEndObject();
+    }
+  }
+
+  /** Aggregation element that calls the "cardinality" function. */
+  private static class JsonCardinalityAggregation extends JsonAggregation {
+    final List<String> fieldNames;
+
+    private JsonCardinalityAggregation(String type, String name,
+        List<String> fieldNames) {
+      super(type, name, null);
+      this.fieldNames = fieldNames;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type);
+      generator.writeStringField("name", name);
+      writeFieldIf(generator, "fieldNames", fieldNames);
+      generator.writeEndObject();
+    }
+  }
+
+  /** Filter element of a Druid "groupBy" or "topN" query. */
+  private abstract static class JsonFilter implements Json {
+    final String type;
+
+    private JsonFilter(String type) {
+      this.type = type;
+    }
+  }
+
+  /** Equality filter. */
+  private static class JsonSelector extends JsonFilter {
+    private final String dimension;
+    private final String value;
+
+    private JsonSelector(String type, String dimension, String value) {
+      super(type);
+      this.dimension = dimension;
+      this.value = value;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type);
+      generator.writeStringField("dimension", dimension);
+      generator.writeStringField("value", value);
+      generator.writeEndObject();
+    }
+  }
+
+  /** Bound filter. */
+  private static class JsonBound extends JsonFilter {
+    private final String dimension;
+    private final String lower;
+    private final boolean lowerStrict;
+    private final String upper;
+    private final boolean upperStrict;
+    private final boolean alphaNumeric;
+
+    private JsonBound(String type, String dimension, String lower,
+        boolean lowerStrict, String upper, boolean upperStrict,
+        boolean alphaNumeric) {
+      super(type);
+      this.dimension = dimension;
+      this.lower = lower;
+      this.lowerStrict = lowerStrict;
+      this.upper = upper;
+      this.upperStrict = upperStrict;
+      this.alphaNumeric = alphaNumeric;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type);
+      generator.writeStringField("dimension", dimension);
+      if (lower != null) {
+        generator.writeStringField("lower", lower);
+        generator.writeBooleanField("lowerStrict", lowerStrict);
+      }
+      if (upper != null) {
+        generator.writeStringField("upper", upper);
+        generator.writeBooleanField("upperStrict", upperStrict);
+      }
+      generator.writeBooleanField("alphaNumeric", alphaNumeric);
+      generator.writeEndObject();
+    }
+  }
+
+  /** Filter that combines other filters using a boolean operator. */
+  private static class JsonCompositeFilter extends JsonFilter {
+    private final List<? extends JsonFilter> fields;
+
+    private JsonCompositeFilter(String type,
+        List<? extends JsonFilter> fields) {
+      super(type);
+      this.fields = fields;
+    }
+
+    public void write(JsonGenerator generator) throws IOException {
+      generator.writeStartObject();
+      generator.writeStringField("type", type);
+      switch (type) {
+      case "NOT":
+        writeField(generator, "field", fields.get(0));
+        break;
+      default:
+        writeField(generator, "fields", fields);
+      }
+      generator.writeEndObject();
+    }
+  }
+
+}
+
+// End DruidQuery.java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
new file mode 100644
index 0000000..228b307
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidQueryType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+/**
+ * Type of Druid query.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public enum DruidQueryType {
+  SELECT("select"),
+  TOP_N("topN"),
+  GROUP_BY("groupBy"),
+  TIMESERIES("timeseries");
+
+  private final String queryName;
+
+  private DruidQueryType(String queryName) {
+    this.queryName = queryName;
+  }
+
+  public String getQueryName() {
+    return this.queryName;
+  }
+}
+
+// End QueryType.java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
new file mode 100644
index 0000000..f68ffa5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidRules.java
@@ -0,0 +1,591 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveDateGranularity;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectSortTransposeRule;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortProjectTransposeRule;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Rules and relational operators for {@link DruidQuery}.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidRules {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidRules.class);
+
+  // Avoid instantiation
+  private DruidRules() {
+  }
+
+  public static final DruidFilterRule FILTER = new DruidFilterRule();
+  public static final DruidProjectRule PROJECT = new DruidProjectRule();
+  public static final DruidAggregateRule AGGREGATE = new DruidAggregateRule();
+  public static final DruidProjectAggregateRule PROJECT_AGGREGATE = new DruidProjectAggregateRule();
+  public static final DruidSortRule SORT = new DruidSortRule();
+  public static final DruidProjectSortRule PROJECT_SORT = new DruidProjectSortRule();
+  public static final DruidSortProjectRule SORT_PROJECT = new DruidSortProjectRule();
+
+  /** Predicate that returns whether Druid can not handle an aggregate. */
+  private static final Predicate<AggregateCall> BAD_AGG = new Predicate<AggregateCall>() {
+    public boolean apply(AggregateCall aggregateCall) {
+      switch (aggregateCall.getAggregation().getKind()) {
+        case COUNT:
+        case SUM:
+        case SUM0:
+        case MIN:
+        case MAX:
+          return false;
+        default:
+          return true;
+      }
+    }
+  };
+
+  /**
+   * Rule to push a {@link org.apache.calcite.rel.core.Filter} into a {@link DruidQuery}.
+   */
+  private static class DruidFilterRule extends RelOptRule {
+    private DruidFilterRule() {
+      super(operand(Filter.class,
+              operand(DruidQuery.class, none())));
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+      final Filter filter = call.rel(0);
+      final DruidQuery query = call.rel(1);
+      if (!DruidQuery.isValidSignature(query.signature() + 'f')
+              || !query.isValidFilter(filter.getCondition())) {
+        return;
+      }
+      // Timestamp
+      int timestampFieldIdx = -1;
+      for (int i = 0; i < query.getRowType().getFieldCount(); i++) {
+        if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(
+                query.getRowType().getFieldList().get(i).getName())) {
+          timestampFieldIdx = i;
+          break;
+        }
+      }
+      final Pair<List<RexNode>, List<RexNode>> pair = splitFilters(
+              filter.getCluster().getRexBuilder(), query, filter.getCondition(), timestampFieldIdx);
+      if (pair == null) {
+        // We can't push anything useful to Druid.
+        return;
+      }
+      List<Interval> intervals = null;
+      if (!pair.left.isEmpty()) {
+        intervals = DruidIntervalUtils.createInterval(
+                query.getRowType().getFieldList().get(timestampFieldIdx).getType(),
+                pair.left);
+        if (intervals == null) {
+          // We can't push anything useful to Druid.
+          return;
+        }
+      }
+      DruidQuery newDruidQuery = query;
+      if (!pair.right.isEmpty()) {
+        if (!validConditions(pair.right)) {
+          return;
+        }
+        final RelNode newFilter = filter.copy(filter.getTraitSet(), Util.last(query.rels),
+                RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), pair.right, false));
+        newDruidQuery = DruidQuery.extendQuery(query, newFilter);
+      }
+      if (intervals != null) {
+        newDruidQuery = DruidQuery.extendQuery(newDruidQuery, intervals);
+      }
+      call.transformTo(newDruidQuery);
+    }
+
+    /* Splits the filter condition in two groups: those that filter on the timestamp column
+     * and those that filter on other fields */
+    private static Pair<List<RexNode>, List<RexNode>> splitFilters(final RexBuilder rexBuilder,
+            final DruidQuery input, RexNode cond, final int timestampFieldIdx) {
+      final List<RexNode> timeRangeNodes = new ArrayList<>();
+      final List<RexNode> otherNodes = new ArrayList<>();
+      List<RexNode> conjs = RelOptUtil.conjunctions(cond);
+      if (conjs.isEmpty()) {
+        // We do not transform
+        return null;
+      }
+      // Number of columns with the dimensions and timestamp
+      int max = input.getRowType().getFieldCount() - input.druidTable.metricFieldNames.size();
+      for (RexNode conj : conjs) {
+        final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
+        conj.accept(visitor);
+        if (visitor.inputPosReferenced.contains(timestampFieldIdx)) {
+          if (visitor.inputPosReferenced.size() != 1) {
+            // Complex predicate, transformation currently not supported
+            return null;
+          }
+          timeRangeNodes.add(conj);
+        } else if (!visitor.inputPosReferenced.tailSet(max).isEmpty()) {
+          // Filter on metrics, not supported in Druid
+          return null;
+        } else {
+          otherNodes.add(conj);
+        }
+      }
+      return Pair.of(timeRangeNodes, otherNodes);
+    }
+
+    /* Checks that all conditions are on ref + literal*/
+    private static boolean validConditions(List<RexNode> nodes) {
+      for (RexNode node: nodes) {
+        try {
+          node.accept(
+              new RexVisitorImpl<Void>(true) {
+                @SuppressWarnings("incomplete-switch")
+                @Override public Void visitCall(RexCall call) {
+                  switch (call.getKind()) {
+                    case CAST:
+                      // Only if on top of ref or literal
+                      if (call.getOperands().get(0) instanceof RexInputRef ||
+                              call.getOperands().get(0) instanceof RexLiteral) {
+                        break;
+                      }
+                      // Not supported
+                      throw Util.FoundOne.NULL;
+                    case EQUALS:
+                    case LESS_THAN:
+                    case LESS_THAN_OR_EQUAL:
+                    case GREATER_THAN:
+                    case GREATER_THAN_OR_EQUAL:
+                      // Check cast
+                      RexNode left = call.getOperands().get(0);
+                      if (left.getKind() == SqlKind.CAST) {
+                        left = ((RexCall)left).getOperands().get(0);
+                      }
+                      RexNode right = call.getOperands().get(1);
+                      if (right.getKind() == SqlKind.CAST) {
+                        right = ((RexCall)right).getOperands().get(0);
+                      }
+                      if (left instanceof RexInputRef && right instanceof RexLiteral) {
+                        break;
+                      }
+                      if (right instanceof RexInputRef && left instanceof RexLiteral) {
+                        break;
+                      }
+                      // Not supported if it is not ref + literal
+                      throw Util.FoundOne.NULL;
+                    case BETWEEN:
+                    case IN:
+                      // Not supported here yet
+                      throw Util.FoundOne.NULL;
+                  }
+                  return super.visitCall(call);
+                }
+              });
+        } catch (Util.FoundOne e) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Rule to push a {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
+   */
+  private static class DruidProjectRule extends RelOptRule {
+    private DruidProjectRule() {
+      super(operand(Project.class,
+              operand(DruidQuery.class, none())));
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+      final Project project = call.rel(0);
+      final DruidQuery query = call.rel(1);
+      if (!DruidQuery.isValidSignature(query.signature() + 'p')) {
+        return;
+      }
+
+      if (canProjectAll(project.getProjects())) {
+        // All expressions can be pushed to Druid in their entirety.
+        final RelNode newProject = project.copy(project.getTraitSet(),
+                ImmutableList.of(Util.last(query.rels)));
+        RelNode newNode = DruidQuery.extendQuery(query, newProject);
+        call.transformTo(newNode);
+        return;
+      }
+      final Pair<List<RexNode>, List<RexNode>> pair = splitProjects(
+              project.getCluster().getRexBuilder(), query, project.getProjects());
+      if (pair == null) {
+        // We can't push anything useful to Druid.
+        return;
+      }
+      final List<RexNode> above = pair.left;
+      final List<RexNode> below = pair.right;
+      final RelDataTypeFactory.FieldInfoBuilder builder = project.getCluster().getTypeFactory()
+              .builder();
+      final RelNode input = Util.last(query.rels);
+      for (RexNode e : below) {
+        final String name;
+        if (e instanceof RexInputRef) {
+          name = input.getRowType().getFieldNames().get(((RexInputRef) e).getIndex());
+        } else {
+          name = null;
+        }
+        builder.add(name, e.getType());
+      }
+      final RelNode newProject = project.copy(project.getTraitSet(), input, below, builder.build());
+      final DruidQuery newQuery = DruidQuery.extendQuery(query, newProject);
+      final RelNode newProject2 = project.copy(project.getTraitSet(), newQuery, above,
+              project.getRowType());
+      call.transformTo(newProject2);
+    }
+
+    private static boolean canProjectAll(List<RexNode> nodes) {
+      for (RexNode e : nodes) {
+        if (!(e instanceof RexInputRef)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private static Pair<List<RexNode>, List<RexNode>> splitProjects(final RexBuilder rexBuilder,
+            final RelNode input, List<RexNode> nodes) {
+      final RelOptUtil.InputReferencedVisitor visitor = new RelOptUtil.InputReferencedVisitor();
+      for (RexNode node : nodes) {
+        node.accept(visitor);
+      }
+      if (visitor.inputPosReferenced.size() == input.getRowType().getFieldCount()) {
+        // All inputs are referenced
+        return null;
+      }
+      final List<RexNode> belowNodes = new ArrayList<>();
+      final List<RelDataType> belowTypes = new ArrayList<>();
+      final List<Integer> positions = Lists.newArrayList(visitor.inputPosReferenced);
+      for (int i : positions) {
+        final RexNode node = rexBuilder.makeInputRef(input, i);
+        belowNodes.add(node);
+        belowTypes.add(node.getType());
+      }
+      final List<RexNode> aboveNodes = new ArrayList<>();
+      for (RexNode node : nodes) {
+        aboveNodes.add(node.accept(new RexShuttle() {
+          @Override
+          public RexNode visitInputRef(RexInputRef ref) {
+            final int index = positions.indexOf(ref.getIndex());
+            return rexBuilder.makeInputRef(belowTypes.get(index), index);
+          }
+        }));
+      }
+      return Pair.of(aboveNodes, belowNodes);
+    }
+  }
+
+  /**
+   * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}.
+   */
+  private static class DruidAggregateRule extends RelOptRule {
+    private DruidAggregateRule() {
+      super(operand(Aggregate.class,
+              operand(DruidQuery.class, none())));
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+      final Aggregate aggregate = call.rel(0);
+      final DruidQuery query = call.rel(1);
+      if (!DruidQuery.isValidSignature(query.signature() + 'a')) {
+        return;
+      }
+      if (aggregate.indicator
+              || aggregate.getGroupSets().size() != 1
+              || Iterables.any(aggregate.getAggCallList(), BAD_AGG)
+              || !validAggregate(aggregate, query)) {
+        return;
+      }
+      final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
+              ImmutableList.of(Util.last(query.rels)));
+      call.transformTo(DruidQuery.extendQuery(query, newAggregate));
+    }
+
+    /* Check whether agg functions reference timestamp */
+    private static boolean validAggregate(Aggregate aggregate, DruidQuery query) {
+      ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
+      for (AggregateCall aggCall : aggregate.getAggCallList()) {
+        builder.addAll(aggCall.getArgList());
+      }
+      return !checkTimestampRefOnQuery(builder.build(), query.getTopNode());
+    }
+  }
+
+  /**
+   * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} and
+   * {@link org.apache.calcite.rel.core.Project} into a {@link DruidQuery}.
+   */
+  private static class DruidProjectAggregateRule extends RelOptRule {
+    private DruidProjectAggregateRule() {
+      super(operand(Aggregate.class,
+              operand(Project.class,
+                      operand(DruidQuery.class, none()))));
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+      final Aggregate aggregate = call.rel(0);
+      final Project project = call.rel(1);
+      final DruidQuery query = call.rel(2);
+      if (!DruidQuery.isValidSignature(query.signature() + 'p' + 'a')) {
+        return;
+      }
+      int timestampIdx;
+      if ((timestampIdx = validProject(project, query)) == -1) {
+        return;
+      }
+      if (aggregate.indicator
+              || aggregate.getGroupSets().size() != 1
+              || Iterables.any(aggregate.getAggCallList(), BAD_AGG)
+              || !validAggregate(aggregate, timestampIdx)) {
+        return;
+      }
+
+      final RelNode newProject = project.copy(project.getTraitSet(),
+              ImmutableList.of(Util.last(query.rels)));
+      final DruidQuery projectDruidQuery = DruidQuery.extendQuery(query, newProject);
+      final RelNode newAggregate = aggregate.copy(aggregate.getTraitSet(),
+              ImmutableList.of(Util.last(projectDruidQuery.rels)));
+      call.transformTo(DruidQuery.extendQuery(projectDruidQuery, newAggregate));
+    }
+
+    /* To be a valid Project, we allow it to contain references, and a single call
+     * to an EXTRACT function on the timestamp column. Returns the reference to
+     * the timestamp, if any. */
+    private static int validProject(Project project, DruidQuery query) {
+      List<RexNode> nodes = project.getProjects();
+      int idxTimestamp = -1;
+      for (int i = 0; i < nodes.size(); i++) {
+        final RexNode e = nodes.get(i);
+        if (e instanceof RexCall) {
+          // It is a call, check that it is EXTRACT and follow-up conditions
+          final RexCall call = (RexCall) e;
+          if (!HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator())) {
+            return -1;
+          }
+          if (idxTimestamp != -1) {
+            // Already one usage of timestamp column
+            return -1;
+          }
+          if (!(call.getOperands().get(0) instanceof RexInputRef)) {
+            return -1;
+          }
+          final RexInputRef ref = (RexInputRef) call.getOperands().get(0);
+          if (!(checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode()))) {
+            return -1;
+          }
+          idxTimestamp = i;
+          continue;
+        }
+        if (!(e instanceof RexInputRef)) {
+          // It needs to be a reference
+          return -1;
+        }
+        final RexInputRef ref = (RexInputRef) e;
+        if (checkTimestampRefOnQuery(ImmutableBitSet.of(ref.getIndex()), query.getTopNode())) {
+          if (idxTimestamp != -1) {
+            // Already one usage of timestamp column
+            return -1;
+          }
+          idxTimestamp = i;
+        }
+      }
+      return idxTimestamp;
+    }
+
+    private static boolean validAggregate(Aggregate aggregate, int idx) {
+      if (!aggregate.getGroupSet().get(idx)) {
+        return false;
+      }
+      for (AggregateCall aggCall : aggregate.getAggCallList()) {
+        if (aggCall.getArgList().contains(idx)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Rule to push an {@link org.apache.calcite.rel.core.Sort} through a
+   * {@link org.apache.calcite.rel.core.Project}. Useful to transform
+   * to complex Druid queries.
+   */
+  private static class DruidProjectSortRule extends HiveSortProjectTransposeRule {
+    private DruidProjectSortRule() {
+      super(operand(Sort.class,
+              operand(Project.class,
+                      operand(DruidQuery.class, none()))));
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      return true;
+    }
+
+  }
+
+  /**
+   * Rule to push back {@link org.apache.calcite.rel.core.Project} through a
+   * {@link org.apache.calcite.rel.core.Sort}. Useful if after pushing Sort,
+   * we could not push it inside DruidQuery.
+   */
+  private static class DruidSortProjectRule extends HiveProjectSortTransposeRule {
+    private DruidSortProjectRule() {
+      super(operand(Project.class,
+              operand(Sort.class,
+                      operand(DruidQuery.class, none()))));
+    }
+  }
+
+  /**
+   * Rule to push an {@link org.apache.calcite.rel.core.Aggregate} into a {@link DruidQuery}.
+   */
+  private static class DruidSortRule extends RelOptRule {
+    private DruidSortRule() {
+      super(operand(Sort.class,
+              operand(DruidQuery.class, none())));
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+      final Sort sort = call.rel(0);
+      final DruidQuery query = call.rel(1);
+      if (!DruidQuery.isValidSignature(query.signature() + 'l')) {
+        return;
+      }
+      // Either it is:
+      // - a sort without limit on the time column on top of
+      //     Agg operator (transformable to timeseries query), or
+      // - it is a sort w/o limit on columns that do not include
+      //     the time column on top of Agg operator, or
+      // - a simple limit on top of other operator than Agg
+      if (!validSortLimit(sort, query)) {
+        return;
+      }
+      final RelNode newSort = sort.copy(sort.getTraitSet(),
+              ImmutableList.of(Util.last(query.rels)));
+      call.transformTo(DruidQuery.extendQuery(query, newSort));
+    }
+
+    /* Check sort valid */
+    private static boolean validSortLimit(Sort sort, DruidQuery query) {
+      if (sort.offset != null && RexLiteral.intValue(sort.offset) != 0) {
+        // offset not supported by Druid
+        return false;
+      }
+      if (query.getTopNode() instanceof Aggregate) {
+        final Aggregate topAgg = (Aggregate) query.getTopNode();
+        final ImmutableBitSet.Builder positionsReferenced = ImmutableBitSet.builder();
+        int metricsRefs = 0;
+        for (RelFieldCollation col : sort.collation.getFieldCollations()) {
+          int idx = col.getFieldIndex();
+          if (idx >= topAgg.getGroupCount()) {
+            metricsRefs++;
+            continue;
+          }
+          positionsReferenced.set(topAgg.getGroupSet().nth(idx));
+        }
+        boolean refsTimestamp =
+                checkTimestampRefOnQuery(positionsReferenced.build(), topAgg.getInput());
+        if (refsTimestamp && metricsRefs != 0) {
+          return false;
+        }
+        return true;
+      }
+      // If it is going to be a Druid select operator, we push the limit iff
+      // 1) it does not contain a sort specification (required by Druid) and
+      // 2) limit is smaller than select threshold, as otherwise it might be
+      //   better to obtain some parallelization and let global limit
+      //   optimizer kick in
+      HiveDruidConf conf = sort.getCluster().getPlanner()
+              .getContext().unwrap(HiveDruidConf.class);
+      return HiveCalciteUtil.pureLimitRelNode(sort) &&
+              RexLiteral.intValue(sort.fetch) <= conf.getSelectThreshold();
+    }
+  }
+
+  /* Check if any of the references leads to the timestamp column */
+  private static boolean checkTimestampRefOnQuery(ImmutableBitSet set, RelNode top) {
+    if (top instanceof Project) {
+      ImmutableBitSet.Builder newSet = ImmutableBitSet.builder();
+      final Project project = (Project) top;
+      for (int index : set) {
+        RexNode node = project.getProjects().get(index);
+        if (node instanceof RexInputRef) {
+          newSet.set(((RexInputRef)node).getIndex());
+        } else if (node instanceof RexCall) {
+          RexCall call = (RexCall) node;
+          assert HiveDateGranularity.ALL_FUNCTIONS.contains(call.getOperator());
+          newSet.set(((RexInputRef)call.getOperands().get(0)).getIndex());
+        }
+      }
+      top = project.getInput();
+      set = newSet.build();
+    }
+
+    // Check if any references the timestamp column
+    for (int index : set) {
+      if (DruidTable.DEFAULT_TIMESTAMP_COLUMN.equals(top.getRowType().getFieldNames().get(index))) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+}
+
+// End DruidRules.java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
new file mode 100644
index 0000000..3b3f68a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidSchema.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.util.Map;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Schema mapped onto a Druid instance.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidSchema extends AbstractSchema {
+  final String url;
+
+  /**
+   * Creates a Druid schema.
+   *
+   * @param url URL of query REST service
+   */
+  public DruidSchema(String url) {
+    this.url = Preconditions.checkNotNull(url);
+  }
+
+  @Override protected Map<String, Table> getTableMap() {
+    final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+    return builder.build();
+  }
+}
+
+// End DruidSchema.java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
new file mode 100644
index 0000000..7288291
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/DruidTable.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.interpreter.BindableConvention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Table mapped onto a Druid table.
+ *
+ * TODO: to be removed when Calcite is upgraded to 1.9
+ */
+public class DruidTable extends AbstractTable implements TranslatableTable {
+
+  public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
+  public static final Interval DEFAULT_INTERVAL = new Interval(
+          new DateTime("1900-01-01"),
+          new DateTime("3000-01-01")
+  );
+
+  final DruidSchema schema;
+  final String dataSource;
+  final RelDataType rowType;
+  final RelProtoDataType protoRowType;
+  final ImmutableSet<String> metricFieldNames;
+  final ImmutableList<Interval> intervals;
+  final String timestampFieldName;
+
+  /**
+   * Creates a Druid table.
+   *
+   * @param schema Druid schema that contains this table
+   * @param dataSource Druid data source name
+   * @param protoRowType Field names and types
+   * @param metricFieldNames Names of fields that are metrics
+   * @param interval Default interval if query does not constrain the time
+   * @param timestampFieldName Name of the column that contains the time
+   */
+  public DruidTable(DruidSchema schema, String dataSource,
+      RelProtoDataType protoRowType, Set<String> metricFieldNames,
+      List<Interval> intervals, String timestampFieldName) {
+    this.schema = Preconditions.checkNotNull(schema);
+    this.dataSource = Preconditions.checkNotNull(dataSource);
+    this.rowType = null;
+    this.protoRowType = protoRowType;
+    this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
+    this.intervals = ImmutableList.copyOf(intervals);
+    this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName);
+  }
+
+  public DruidTable(DruidSchema schema, String dataSource,
+      RelDataType rowType, Set<String> metricFieldNames,
+      List<Interval> intervals, String timestampFieldName) {
+    this.schema = Preconditions.checkNotNull(schema);
+    this.dataSource = Preconditions.checkNotNull(dataSource);
+    this.rowType = Preconditions.checkNotNull(rowType);
+    this.protoRowType = null;
+    this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
+    this.intervals = ImmutableList.copyOf(intervals);
+    this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName);
+  }
+
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    final RelDataType thisRowType;
+    if (rowType != null) {
+      thisRowType = rowType;
+    } else {
+      // Generate
+      thisRowType = protoRowType.apply(typeFactory);
+    }
+    final List<String> fieldNames = thisRowType.getFieldNames();
+    Preconditions.checkArgument(fieldNames.contains(timestampFieldName));
+    Preconditions.checkArgument(fieldNames.containsAll(metricFieldNames));
+    return thisRowType;
+  }
+
+  public RelNode toRel(RelOptTable.ToRelContext context,
+      RelOptTable relOptTable) {
+    final RelOptCluster cluster = context.getCluster();
+    final TableScan scan = LogicalTableScan.create(cluster, relOptTable);
+    return DruidQuery.create(cluster,
+        cluster.traitSetOf(BindableConvention.INSTANCE), relOptTable, this,
+        ImmutableList.<RelNode>of(scan));
+  }
+
+}
+
+// End DruidTable.java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
new file mode 100644
index 0000000..0686dff
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/druid/HiveDruidConf.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.druid;
+
+public class HiveDruidConf {
+
+  private int selectThreshold;
+
+
+  public HiveDruidConf(int selectThreshold) {
+    this.selectThreshold = selectThreshold;
+  }
+
+  public int getSelectThreshold() {
+    return selectThreshold;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
index bc48707..75b7ad2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
@@ -41,7 +41,7 @@ public class HiveSqlCountAggFunction extends SqlAggFunction implements CanAggreg
       SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) {
     super(
         "count",
-        SqlKind.OTHER_FUNCTION,
+        SqlKind.COUNT,
         returnTypeInference,
         operandTypeInference,
         operandTypeChecker,

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
index 77dca1f..834fc3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlMinMaxAggFunction.java
@@ -32,7 +32,7 @@ public class HiveSqlMinMaxAggFunction extends SqlAggFunction {
       SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker, boolean isMin) {
     super(
         isMin ? "min" : "max",
-        SqlKind.OTHER_FUNCTION,
+        isMin ? SqlKind.MIN : SqlKind.MAX,
         returnTypeInference,
         operandTypeInference,
         operandTypeChecker,

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
index dc286a2..1d551a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
@@ -58,7 +58,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction implements CanAggregat
     SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) {
     super(
         "sum",
-        SqlKind.OTHER_FUNCTION,
+        SqlKind.SUM,
         returnTypeInference,
         operandTypeInference,
         operandTypeChecker,

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
new file mode 100644
index 0000000..b3f8d9b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveDateGranularity.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators;
+
+import java.util.Set;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+
+import com.google.common.collect.Sets;
+
+public class HiveDateGranularity extends SqlFunction {
+
+  public static final SqlFunction YEAR = new HiveDateGranularity("YEAR");
+  public static final SqlFunction QUARTER = new HiveDateGranularity("QUARTER");
+  public static final SqlFunction MONTH = new HiveDateGranularity("MONTH");
+  public static final SqlFunction WEEK = new HiveDateGranularity("WEEK");
+  public static final SqlFunction DAY = new HiveDateGranularity("DAY");
+  public static final SqlFunction HOUR = new HiveDateGranularity("HOUR");
+  public static final SqlFunction MINUTE = new HiveDateGranularity("MINUTE");
+  public static final SqlFunction SECOND = new HiveDateGranularity("SECOND");
+
+  public static final Set<SqlFunction> ALL_FUNCTIONS =
+          Sets.newHashSet(YEAR, QUARTER, MONTH, WEEK, DAY, HOUR, MINUTE, SECOND);
+
+  private HiveDateGranularity(String name) {
+    super(
+        name,
+        SqlKind.OTHER_FUNCTION,
+        ReturnTypes.TIME_NULLABLE,
+        null,
+        OperandTypes.ANY,
+        SqlFunctionCategory.TIMEDATE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
index aac6126..fd19d99 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveProjectSortTransposeRule.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
@@ -48,6 +49,10 @@ public class HiveProjectSortTransposeRule extends RelOptRule {
             operand(HiveSortLimit.class, any())));
   }
 
+  protected HiveProjectSortTransposeRule(RelOptRuleOperand operand) {
+    super(operand);
+  }
+
   //~ Methods ----------------------------------------------------------------
 
   // implement RelOptRule

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
index feec3c2..fe29850 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveSortProjectTransposeRule.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
@@ -49,6 +50,10 @@ public class HiveSortProjectTransposeRule extends RelOptRule {
             operand(HiveProject.class, any())));
   }
 
+  protected HiveSortProjectTransposeRule(RelOptRuleOperand operand) {
+    super(operand);
+  }
+
   //~ Methods ----------------------------------------------------------------
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
index 78c76ab..9a5becb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
@@ -22,19 +22,21 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 
-import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
 import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidQuery;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.HiveParser;
 import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 
 class ASTBuilder {
 
@@ -62,14 +64,32 @@ class ASTBuilder {
         ASTBuilder.construct(HiveParser.TOK_TABNAME, "TOK_TABNAME")
             .add(HiveParser.Identifier, hTbl.getHiveTableMD().getDbName())
             .add(HiveParser.Identifier, hTbl.getHiveTableMD().getTableName()));
-    // we need to carry the insideView information from calcite into the ast.
-    if (((HiveTableScan) scan).isInsideView()) {
-      b.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTIES, "TOK_TABLEPROPERTIES").add(
-          ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST").add(
-              ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
-                  .add(HiveParser.StringLiteral, "\"insideView\"")
-                  .add(HiveParser.StringLiteral, "\"TRUE\""))));
+
+    HiveTableScan hts;
+    if (scan instanceof DruidQuery) {
+      hts = (HiveTableScan) ((DruidQuery)scan).getTableScan();
+    } else {
+      hts = (HiveTableScan) scan;
+    }
+    ASTBuilder propList = ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST");
+    if (scan instanceof DruidQuery) {
+      // Pass possible query to Druid
+      DruidQuery dq = (DruidQuery) scan;
+      propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
+              .add(HiveParser.StringLiteral, "\"" + Constants.DRUID_QUERY_JSON + "\"")
+              .add(HiveParser.StringLiteral, "\"" + SemanticAnalyzer.escapeSQLString(
+                      dq.getQueryString()) + "\""));
+      propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
+              .add(HiveParser.StringLiteral, "\"" + Constants.DRUID_QUERY_TYPE + "\"")
+              .add(HiveParser.StringLiteral, "\"" + dq.getQueryType() + "\""));
+    }
+    if (hts.isInsideView()) {
+      // We need to carry the insideView information from calcite into the ast.
+      propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY")
+              .add(HiveParser.StringLiteral, "\"insideView\"")
+              .add(HiveParser.StringLiteral, "\"TRUE\""));
     }
+    b.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTIES, "TOK_TABLEPROPERTIES").add(propList));
 
     // NOTE: Calcite considers tbls to be equal if their names are the same. Hence
     // we need to provide Calcite the fully qualified table name (dbname.tblname)
@@ -77,7 +97,7 @@ class ASTBuilder {
     // However in HIVE DB name can not appear in select list; in case of join
     // where table names differ only in DB name, Hive would require user
     // introducing explicit aliases for tbl.
-    b.add(HiveParser.Identifier, ((HiveTableScan)scan).getTableAlias());
+    b.add(HiveParser.Identifier, hts.getTableAlias());
     return b.node();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/58d1befa/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
index 40215a2..9f5e733 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
@@ -56,6 +56,7 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
+import org.apache.hadoop.hive.ql.optimizer.calcite.druid.DruidQuery;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
@@ -625,7 +626,13 @@ public class ASTConverter {
     private static final long serialVersionUID = 1L;
 
     Schema(TableScan scan) {
-      String tabName = ((HiveTableScan) scan).getTableAlias();
+      HiveTableScan hts;
+      if (scan instanceof DruidQuery) {
+        hts = (HiveTableScan) ((DruidQuery)scan).getTableScan();
+      } else {
+        hts = (HiveTableScan) scan;
+      }
+      String tabName = hts.getTableAlias();
       for (RelDataTypeField field : scan.getRowType().getFieldList()) {
         add(new ColumnInfo(tabName, field.getName()));
       }


Mime
View raw message