calcite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [07/13] calcite git commit: [CALCITE-1276] In Druid adapter, deduce tables and columns if not specified
Date Sun, 12 Jun 2016 08:07:54 GMT
[CALCITE-1276] In Druid adapter, deduce tables and columns if not specified

Deduce tables by calling /druid/coordinator/v1/metadata/datasources;
columns by running a "segmentMetadata" query.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/435e2030
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/435e2030
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/435e2030

Branch: refs/heads/master
Commit: 435e203028a872933c6d8bd6404ebc445552e599
Parents: 23c8e45
Author: Julian Hyde <jhyde@apache.org>
Authored: Mon Jun 6 23:03:08 2016 -0700
Committer: Julian Hyde <jhyde@apache.org>
Committed: Wed Jun 8 17:35:03 2016 -0700

----------------------------------------------------------------------
 .../org/apache/calcite/model/ModelHandler.java  |  19 ++-
 .../org/apache/calcite/runtime/HttpUtils.java   |   3 +-
 druid/pom.xml                                   |   4 +
 .../adapter/druid/DruidConnectionImpl.java      | 153 ++++++++++++++++++-
 .../calcite/adapter/druid/DruidQuery.java       |  29 +++-
 .../calcite/adapter/druid/DruidSchema.java      |  40 ++++-
 .../adapter/druid/DruidSchemaFactory.java       |  41 ++++-
 .../calcite/adapter/druid/DruidTable.java       |  65 +++++++-
 .../adapter/druid/DruidTableFactory.java        |  61 ++++----
 .../org/apache/calcite/test/DruidAdapterIT.java | 106 ++++++-------
 .../test/resources/druid-foodmart-model.json    |   3 +-
 druid/src/test/resources/druid-wiki-model.json  |   3 +-
 .../resources/druid-wiki-no-columns-model.json  |  45 ++++++
 .../resources/druid-wiki-no-tables-model.json   |  33 ++++
 site/_docs/druid_adapter.md                     |   3 +-
 15 files changed, 490 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/core/src/main/java/org/apache/calcite/model/ModelHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/model/ModelHandler.java b/core/src/main/java/org/apache/calcite/model/ModelHandler.java
index 8b451ee..8c7f5fa 100644
--- a/core/src/main/java/org/apache/calcite/model/ModelHandler.java
+++ b/core/src/main/java/org/apache/calcite/model/ModelHandler.java
@@ -212,7 +212,7 @@ public class ModelHandler {
               jsonSchema.factory);
       final Schema schema =
           schemaFactory.create(
-              parentSchema, jsonSchema.name, operandMap(jsonSchema.operand));
+              parentSchema, jsonSchema.name, operandMap(jsonSchema, jsonSchema.operand));
       final SchemaPlus schemaPlus = parentSchema.add(jsonSchema.name, schema);
       populateSchema(jsonSchema, schemaPlus);
     } catch (Exception e) {
@@ -221,7 +221,8 @@ public class ModelHandler {
   }
 
   /** Adds extra entries to an operand to a custom schema. */
-  protected Map<String, Object> operandMap(Map<String, Object> operand) {
+  protected Map<String, Object> operandMap(JsonSchema jsonSchema,
+      Map<String, Object> operand) {
     if (operand == null) {
       return ImmutableMap.of();
     }
@@ -238,6 +239,13 @@ public class ModelHandler {
             final File file = new File(modelUri);
             builder.put(extraOperand.camelName, file.getParentFile());
           }
+          break;
+        case TABLES:
+          if (jsonSchema instanceof JsonCustomSchema) {
+            builder.put(extraOperand.camelName,
+                ((JsonCustomSchema) jsonSchema).tables);
+          }
+          break;
         }
       }
     }
@@ -338,7 +346,7 @@ public class ModelHandler {
               jsonTable.factory);
       final Table table =
           tableFactory.create(schema, jsonTable.name,
-              operandMap(jsonTable.operand), null);
+              operandMap(null, jsonTable.operand), null);
       schema.add(jsonTable.name, table);
     } catch (Exception e) {
       throw new RuntimeException("Error instantiating " + jsonTable, e);
@@ -428,7 +436,10 @@ public class ModelHandler {
     MODEL_URI("modelUri"),
 
     /** Base directory from which to read files. */
-    BASE_DIRECTORY("baseDirectory");
+    BASE_DIRECTORY("baseDirectory"),
+
+    /** Tables defined in this schema. */
+    TABLES("tables");
 
     public final String camelName;
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java b/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java
index 011092c..4519281 100644
--- a/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java
+++ b/core/src/main/java/org/apache/calcite/runtime/HttpUtils.java
@@ -126,7 +126,8 @@ public class HttpUtils {
       Map<String, String> headers,
       int cTimeout,
       int rTimeout) throws IOException {
-    return executeMethod("POST", url, data, headers, cTimeout, rTimeout);
+    return executeMethod(data == null ? "GET" : "POST", url, data, headers,
+        cTimeout, rTimeout);
   }
 
   public static InputStream executeMethod(

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/pom.xml
----------------------------------------------------------------------
diff --git a/druid/pom.xml b/druid/pom.xml
index 22f0c1d..7470774 100644
--- a/druid/pom.xml
+++ b/druid/pom.xml
@@ -57,6 +57,10 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index dd462b6..dccda9f 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -23,14 +23,19 @@ import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Holder;
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -38,6 +43,7 @@ import java.io.InputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -49,21 +55,26 @@ import static org.apache.calcite.runtime.HttpUtils.post;
  * Implementation of {@link DruidConnection}.
  */
 class DruidConnectionImpl implements DruidConnection {
-  final String url;
+  private final String url;
+  private final String coordinatorUrl;
 
-  public DruidConnectionImpl(String url) {
-    this.url = url;
+  public DruidConnectionImpl(String url, String coordinatorUrl) {
+    this.url = Preconditions.checkNotNull(url);
+    this.coordinatorUrl = Preconditions.checkNotNull(coordinatorUrl);
   }
 
   public void request(QueryType queryType, String data, Sink sink,
       List<String> fieldNames, Page page) throws IOException {
+    final String url = this.url + "/druid/v2/?pretty";
+    final Map<String, String> requestHeaders =
+        ImmutableMap.of("Content-Type", "application/json");
     if (CalcitePrepareImpl.DEBUG) {
       System.out.println(data);
     }
-    final Map<String, String> requestHeaders =
-        ImmutableMap.of("Content-Type", "application/json");
-    final InputStream in = post(url, data, requestHeaders, 10000, 1800000);
-    parse(queryType, in, sink, fieldNames, page);
+    try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000);
+         InputStream in = traceResponse(in0)) {
+      parse(queryType, in, sink, fieldNames, page);
+    }
   }
 
   /** Parses the output of a {@code topN} query, sending the results to a
@@ -290,6 +301,77 @@ class DruidConnectionImpl implements DruidConnection {
     };
   }
 
+  /** Reads segment metadata, and populates a list of columns and metrics. */
+  void metadata(String dataSourceName, List<String> intervals,
+      Map<String, SqlTypeName> fieldBuilder, Set<String> metricNameBuilder) {
+    final String url = this.url + "/druid/v2/?pretty";
+    final Map<String, String> requestHeaders =
+        ImmutableMap.of("Content-Type", "application/json");
+    final String data = DruidQuery.metadataQuery(dataSourceName, intervals);
+    if (CalcitePrepareImpl.DEBUG) {
+      System.out.println("Druid: " + data);
+    }
+    try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000);
+         InputStream in = traceResponse(in0)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      final CollectionType listType =
+          mapper.getTypeFactory().constructCollectionType(List.class,
+              JsonSegmentMetadata.class);
+      final List<JsonSegmentMetadata> list = mapper.readValue(in, listType);
+      in.close();
+      for (JsonSegmentMetadata o : list) {
+        for (Map.Entry<String, JsonColumn> entry : o.columns.entrySet()) {
+          fieldBuilder.put(entry.getKey(), entry.getValue().sqlType());
+        }
+        if (o.aggregators != null) {
+          for (Map.Entry<String, JsonAggregator> entry
+              : o.aggregators.entrySet()) {
+            fieldBuilder.put(entry.getKey(), entry.getValue().sqlType());
+            metricNameBuilder.add(entry.getKey());
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /** Reads data source names from Druid. */
+  Set<String> tableNames() {
+    final Map<String, String> requestHeaders =
+        ImmutableMap.of("Content-Type", "application/json");
+    final String data = null;
+    final String url = coordinatorUrl + "/druid/coordinator/v1/metadata/datasources";
+    if (CalcitePrepareImpl.DEBUG) {
+      System.out.println("Druid: table names" + data + "; " + url);
+    }
+    try (InputStream in0 = post(url, data, requestHeaders, 10000, 1800000);
+         InputStream in = traceResponse(in0)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      final CollectionType listType =
+          mapper.getTypeFactory().constructCollectionType(List.class,
+              String.class);
+      final List<String> list = mapper.readValue(in, listType);
+      return ImmutableSet.copyOf(list);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private InputStream traceResponse(InputStream in) {
+    if (CalcitePrepareImpl.DEBUG) {
+      try {
+        final byte[] bytes = AvaticaUtils.readFullyToBytes(in);
+        in.close();
+        System.out.println("Response: " + new String(bytes));
+        in = new ByteArrayInputStream(bytes);
+      } catch (IOException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+    return in;
+  }
+
   /** A {@link Sink} that is also {@link Runnable}. */
   private interface RunnableQueueSink extends Sink, Runnable {
   }
@@ -343,6 +425,63 @@ class DruidConnectionImpl implements DruidConnection {
       return "{" + pagingIdentifier + ": " + offset + "}";
     }
   }
+
+
+  /** Result of a "segmentMetadata" call, populated by Jackson. */
+  @SuppressWarnings({ "WeakerAccess", "unused" })
+  private static class JsonSegmentMetadata {
+    public String id;
+    public List<String> intervals;
+    public Map<String, JsonColumn> columns;
+    public int size;
+    public int numRows;
+    public Map<String, JsonAggregator> aggregators;
+  }
+
+  /** Element of the "columns" collection in the result of a
+   * "segmentMetadata" call, populated by Jackson. */
+  @SuppressWarnings({ "WeakerAccess", "unused" })
+  private static class JsonColumn {
+    public String type;
+    public boolean hasMultipleValues;
+    public int size;
+    public Integer cardinality;
+    public String errorMessage;
+
+    SqlTypeName sqlType() {
+      return sqlType(type);
+    }
+
+    static SqlTypeName sqlType(String type) {
+      switch (type) {
+      case "LONG":
+        return SqlTypeName.BIGINT;
+      case "DOUBLE":
+        return SqlTypeName.DOUBLE;
+      case "FLOAT":
+        return SqlTypeName.REAL;
+      case "STRING":
+        return SqlTypeName.VARCHAR;
+      case "hyperUnique":
+        return SqlTypeName.VARBINARY;
+      default:
+        throw new AssertionError("unknown type " + type);
+      }
+    }
+  }
+
+  /** Element of the "aggregators" collection in the result of a
+   * "segmentMetadata" call, populated by Jackson. */
+  @SuppressWarnings({ "WeakerAccess", "unused" })
+  private static class JsonAggregator {
+    public String type;
+    public String name;
+    public String fieldName;
+
+    SqlTypeName sqlType() {
+      return JsonColumn.sqlType(type);
+    }
+  }
 }
 
 // End DruidConnectionImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index 0a9dbec..9a858f9 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -376,8 +376,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
         writeFieldIf(generator, "filter", jsonFilter);
         writeField(generator, "aggregations", aggregations);
         writeFieldIf(generator, "postAggregations", null);
-        writeField(generator, "intervals",
-            ImmutableList.of(druidTable.interval));
+        writeField(generator, "intervals", druidTable.intervals);
         writeFieldIf(generator, "having", null);
 
         generator.writeEndObject();
@@ -389,8 +388,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
         generator.writeStringField("queryType", "select");
         generator.writeStringField("dataSource", druidTable.dataSource);
         generator.writeStringField("descending", "false");
-        writeField(generator, "intervals",
-            ImmutableList.of(druidTable.interval));
+        writeField(generator, "intervals", druidTable.intervals);
         writeFieldIf(generator, "filter", jsonFilter);
         writeField(generator, "dimensions", translator.dimensions);
         writeField(generator, "metrics", translator.metrics);
@@ -445,7 +443,7 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
     }
   }
 
-  private static void writeField(JsonGenerator generator, String fieldName,
+  static void writeField(JsonGenerator generator, String fieldName,
       Object o) throws IOException {
     generator.writeFieldName(fieldName);
     writeObject(generator, o);
@@ -521,6 +519,24 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
     return Pair.of(aboveNodes, belowNodes);
   }
 
+  /** Generates a JSON string to query metadata about a data source. */
+  static String metadataQuery(String dataSourceName, List<String> intervals) {
+    final StringWriter sw = new StringWriter();
+    final JsonFactory factory = new JsonFactory();
+    try {
+      final JsonGenerator generator = factory.createGenerator(sw);
+      generator.writeStartObject();
+      generator.writeStringField("queryType", "segmentMetadata");
+      generator.writeStringField("dataSource", dataSourceName);
+      writeFieldIf(generator, "intervals", intervals);
+      generator.writeEndObject();
+      generator.close();
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+    return sw.toString();
+  }
+
   /** Druid query specification. */
   public static class QuerySpec {
     final QueryType queryType;
@@ -660,7 +676,8 @@ public class DruidQuery extends AbstractRelNode implements BindableRel
{
     public void run() throws InterruptedException {
       try {
         final DruidConnectionImpl connection =
-            new DruidConnectionImpl(query.druidTable.schema.url);
+            new DruidConnectionImpl(query.druidTable.schema.url,
+                query.druidTable.schema.coordinatorUrl);
         final DruidConnectionImpl.Page page = new DruidConnectionImpl.Page();
         int previousOffset;
         do {

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java
index bd3539d..28e7086 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchema.java
@@ -18,30 +18,62 @@ package org.apache.calcite.adapter.druid;
 
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.sql.type.SqlTypeName;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nonnull;
 
 /**
  * Schema mapped onto a Druid instance.
  */
 public class DruidSchema extends AbstractSchema {
   final String url;
+  final String coordinatorUrl;
+  private final boolean discoverTables;
 
   /**
    * Creates a Druid schema.
    *
-   * @param url URL of query REST service
+   * @param url URL of query REST service, e.g. "http://localhost:8082"
+   * @param coordinatorUrl URL of coordinator REST service,
+   *                       e.g. "http://localhost:8081"
+   * @param discoverTables If true, ask Druid what tables exist;
+   *                       if false, only create tables explicitly in the model
    */
-  public DruidSchema(String url) {
+  public DruidSchema(String url, String coordinatorUrl,
+      boolean discoverTables) {
     this.url = Preconditions.checkNotNull(url);
+    this.coordinatorUrl = Preconditions.checkNotNull(coordinatorUrl);
+    this.discoverTables = discoverTables;
   }
 
   @Override protected Map<String, Table> getTableMap() {
-    final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
-    return builder.build();
+    if (!discoverTables) {
+      return ImmutableMap.of();
+    }
+    final DruidConnectionImpl connection =
+        new DruidConnectionImpl(url, coordinatorUrl);
+    return Maps.asMap(ImmutableSet.copyOf(connection.tableNames()),
+        CacheBuilder.<String, Table>newBuilder()
+            .build(new CacheLoader<String, Table>() {
+              public Table load(@Nonnull String tableName) throws Exception {
+                final Map<String, SqlTypeName> fieldMap = new LinkedHashMap<>();
+                final Set<String> metricNameSet = new LinkedHashSet<>();
+                connection.metadata(tableName, null, fieldMap, metricNameSet);
+                return DruidTable.create(DruidSchema.this, tableName, null,
+                    fieldMap, metricNameSet, null, connection);
+              }
+            }));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java
index f00b932..d35d0bf 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidSchemaFactory.java
@@ -20,17 +20,52 @@ import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaFactory;
 import org.apache.calcite.schema.SchemaPlus;
 
+import java.util.List;
 import java.util.Map;
 
 /**
  * Schema factory that creates Druid schemas.
+ *
+ * <table>
+ *   <caption>Druid schema operands</caption>
+ *   <tr>
+ *     <th>Operand</th>
+ *     <th>Description</th>
+ *     <th>Required</th>
+ *   </tr>
+ *   <tr>
+ *     <td>url</td>
+ *     <td>URL of Druid's query node.
+ *     The default is "http://localhost:8082".</td>
+ *     <td>No</td>
+ *   </tr>
+ *   <tr>
+ *     <td>coordinatorUrl</td>
+ *     <td>URL of Druid's coordinator node.
+ *     The default is <code>url</code>, replacing "8082" with "8081",
+ *     for example "http://localhost:8081".</td>
+ *     <td>No</td>
+ *   </tr>
+ * </table>
  */
 public class DruidSchemaFactory implements SchemaFactory {
+  /** Default Druid URL. */
+  public static final String DEFAULT_URL = "http://localhost:8082";
+
   public Schema create(SchemaPlus parentSchema, String name,
       Map<String, Object> operand) {
-    Map map = (Map) operand;
-    String url = (String) map.get("url");
-    return new DruidSchema(url);
+    final Map map = (Map) operand;
+    final String url = map.get("url") instanceof String
+        ? (String) map.get("url")
+        : DEFAULT_URL;
+    final String coordinatorUrl = map.get("coordinatorUrl") instanceof String
+        ? (String) map.get("coordinatorUrl")
+        : url.replace(":8082", ":8081");
+    // "tables" is a hidden attribute, copied in from the enclosing custom
+    // schema
+    final boolean containsTables = map.get("tables") instanceof List
+        && ((List) map.get("tables")).size() > 0;
+    return new DruidSchema(url, coordinatorUrl, !containsTables);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
index 78dcc0d..48258c1 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTable.java
@@ -25,25 +25,33 @@ import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Util;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
  * Table mapped onto a Druid table.
  */
 public class DruidTable extends AbstractTable implements TranslatableTable {
+  protected static final String DEFAULT_INTERVAL =
+      "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z";
   final DruidSchema schema;
   final String dataSource;
   final RelProtoDataType protoRowType;
   final ImmutableSet<String> metricFieldNames;
-  final String interval;
+  final List<String> intervals;
   final String timestampFieldName;
 
   /**
@@ -53,18 +61,50 @@ public class DruidTable extends AbstractTable implements TranslatableTable
{
    * @param dataSource Druid data source name
    * @param protoRowType Field names and types
    * @param metricFieldNames Names of fields that are metrics
-   * @param interval Default interval if query does not constrain the time
+   * @param intervals Default interval if query does not constrain the time, or null
    * @param timestampFieldName Name of the column that contains the time
    */
   public DruidTable(DruidSchema schema, String dataSource,
-      RelProtoDataType protoRowType, Set<String> metricFieldNames, String interval,
+      RelProtoDataType protoRowType, Set<String> metricFieldNames, List<String>
intervals,
       String timestampFieldName) {
     this.timestampFieldName = Preconditions.checkNotNull(timestampFieldName);
     this.schema = Preconditions.checkNotNull(schema);
     this.dataSource = Preconditions.checkNotNull(dataSource);
     this.protoRowType = protoRowType;
     this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
-    this.interval = Preconditions.checkNotNull(interval);
+    this.intervals = Preconditions.checkNotNull(
+        Util.first(intervals, ImmutableList.of(DEFAULT_INTERVAL)));
+  }
+
+  /** Creates a {@link DruidTable}
+   *
+   * @param druidSchema Druid schema
+   * @param dataSourceName Data source name in Druid, also table name
+   * @param intervals Intervals, or null to use default
+   * @param fieldMap Mutable map of fields (dimensions plus metrics);
+   *        may be partially populated already
+   * @param metricNameSet Mutable set of metric names;
+   *        may be partially populated already
+   * @param timestampColumnName Name of timestamp column, or null
+   * @param connection If not null, use this connection to find column
+   *                   definitions
+   * @return A table
+   */
+  static Table create(DruidSchema druidSchema, String dataSourceName,
+      List<String> intervals, Map<String, SqlTypeName> fieldMap,
+      Set<String> metricNameSet, String timestampColumnName,
+      DruidConnectionImpl connection) {
+    if (connection != null) {
+      connection.metadata(dataSourceName, intervals, fieldMap, metricNameSet);
+    }
+    final ImmutableMap<String, SqlTypeName> fields =
+        ImmutableMap.copyOf(fieldMap);
+    if (timestampColumnName == null) {
+      timestampColumnName = Iterables.get(fieldMap.keySet(), 0);
+    }
+    return new DruidTable(druidSchema, dataSourceName,
+        new MapRelProtoDataType(fields), ImmutableSet.copyOf(metricNameSet),
+        intervals, Util.first(timestampColumnName, "__time"));
   }
 
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
@@ -84,6 +124,23 @@ public class DruidTable extends AbstractTable implements TranslatableTable
{
         ImmutableList.<RelNode>of(scan));
   }
 
+  /** Creates a {@link RelDataType} from a map of
+   * field names and types. */
+  private static class MapRelProtoDataType implements RelProtoDataType {
+    private final ImmutableMap<String, SqlTypeName> fields;
+
+    MapRelProtoDataType(ImmutableMap<String, SqlTypeName> fields) {
+      this.fields = fields;
+    }
+
+    public RelDataType apply(RelDataTypeFactory typeFactory) {
+      final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
+      for (Map.Entry<String, SqlTypeName> field : fields.entrySet()) {
+        builder.add(field.getKey(), field.getValue()).nullable(true);
+      }
+      return builder.build();
+    }
+  }
 }
 
 // End DruidTable.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
----------------------------------------------------------------------
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
index 756e558..2453fcb 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidTableFactory.java
@@ -17,17 +17,15 @@
 package org.apache.calcite.adapter.druid;
 
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.TableFactory;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.Util;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
 
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +37,11 @@ import java.util.Set;
  * <p>A table corresponds to what Druid calls a "data source".
  */
 public class DruidTableFactory implements TableFactory {
+  @SuppressWarnings("unused")
+  public static final DruidTableFactory INSTANCE = new DruidTableFactory();
+
+  private DruidTableFactory() {}
+
   public Table create(SchemaPlus schema, String name, Map operand,
       RelDataType rowType) {
     final DruidSchema druidSchema = schema.unwrap(DruidSchema.class);
@@ -46,17 +49,18 @@ public class DruidTableFactory implements TableFactory {
     final String dataSource = (String) operand.get("dataSource");
     final Set<String> metricNameBuilder = new LinkedHashSet<>();
     String timestampColumnName = (String) operand.get("timestampColumn");
-    final ImmutableMap.Builder<String, SqlTypeName> fieldBuilder =
-        ImmutableMap.builder();
-    if (operand.get("dimensions") != null) {
+    final Map<String, SqlTypeName> fieldBuilder = new LinkedHashMap<>();
+    final Object dimensionsRaw = operand.get("dimensions");
+    if (dimensionsRaw instanceof List) {
       //noinspection unchecked
-      final List<String> dimensions = (List<String>) operand.get("dimensions");
+      final List<String> dimensions = (List<String>) dimensionsRaw;
       for (String dimension : dimensions) {
         fieldBuilder.put(dimension, SqlTypeName.VARCHAR);
       }
     }
-    if (operand.get("metrics") != null) {
-      final List metrics = (List) operand.get("metrics");
+    final Object metricsRaw = operand.get("metrics");
+    if (metricsRaw instanceof List) {
+      final List metrics = (List) metricsRaw;
       for (Object metric : metrics) {
         final SqlTypeName sqlTypeName;
         final String metricName;
@@ -83,32 +87,23 @@ public class DruidTableFactory implements TableFactory {
         metricNameBuilder.add(metricName);
       }
     }
-    fieldBuilder.put(timestampColumnName, SqlTypeName.VARCHAR);
-    final ImmutableMap<String, SqlTypeName> fields = fieldBuilder.build();
-    String interval = Util.first((String) operand.get("interval"),
-        "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z");
-    return new DruidTable(druidSchema, Util.first(dataSource, name),
-        new MapRelProtoDataType(fields),
-        ImmutableSet.copyOf(metricNameBuilder), interval, timestampColumnName);
-  }
-
-  /** Creates a {@link org.apache.calcite.rel.type.RelDataType} from a map of
-   * field names and types. */
-  private static class MapRelProtoDataType implements RelProtoDataType {
-    private final ImmutableMap<String, SqlTypeName> fields;
-
-    public MapRelProtoDataType(ImmutableMap<String, SqlTypeName> fields) {
-      this.fields = fields;
+    if (timestampColumnName != null) {
+      fieldBuilder.put(timestampColumnName, SqlTypeName.VARCHAR);
     }
-
-    public RelDataType apply(RelDataTypeFactory typeFactory) {
-      final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
-      for (Map.Entry<String, SqlTypeName> field : fields.entrySet()) {
-        builder.add(field.getKey(), field.getValue()).nullable(true);
-      }
-      return builder.build();
+    final String dataSourceName = Util.first(dataSource, name);
+    DruidConnectionImpl c;
+    if (dimensionsRaw == null || metricsRaw == null) {
+      c = new DruidConnectionImpl(druidSchema.url, druidSchema.url.replace(":8082", ":8081"));
+    } else {
+      c = null;
     }
+    final Object interval = operand.get("interval");
+    final List<String> intervals = interval instanceof String
+        ? ImmutableList.of((String) interval) : null;
+    return DruidTable.create(druidSchema, dataSourceName, intervals,
+        fieldBuilder, metricNameBuilder, timestampColumnName, c);
   }
+
 }
 
 // End DruidTableFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 34717b6..d59ef24 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -22,18 +22,16 @@ import org.apache.calcite.util.Util;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.net.URL;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collections;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -60,18 +58,24 @@ import static org.junit.Assert.assertTrue;
  * </ul>
  */
 public class DruidAdapterIT {
-  /** Connection factory based on the "druid-foodmart" model. */
-  public static final ImmutableMap<String, String> FOODMART =
-      ImmutableMap.of("model",
-          DruidAdapterIT.class.getResource("/druid-foodmart-model.json")
-              .getPath());
+  /** URL of the "druid-foodmart" model. */
+  public static final URL FOODMART =
+      DruidAdapterIT.class.getResource("/druid-foodmart-model.json");
 
-  /** Connection factory based on the "druid-wiki" model
+  /** URL of the "druid-wiki" model
    * and the "wikiticker" data set. */
-  public static final ImmutableMap<String, String> WIKI =
-      ImmutableMap.of("model",
-          DruidAdapterIT.class.getResource("/druid-wiki-model.json")
-              .getPath());
+  public static final URL WIKI =
+      DruidAdapterIT.class.getResource("/druid-wiki-model.json");
+
+  /** URL of the "druid-wiki-no-columns" model
+   * and the "wikiticker" data set. */
+  public static final URL WIKI_AUTO =
+      DruidAdapterIT.class.getResource("/druid-wiki-no-columns-model.json");
+
+  /** URL of the "druid-wiki-no-tables" model
+   * and the "wikiticker" data set. */
+  public static final URL WIKI_AUTO2 =
+      DruidAdapterIT.class.getResource("/druid-wiki-no-tables-model.json");
 
   /** Whether to run Druid tests. Enabled by default, however test is only
    * included if "it" profile is activated ({@code -Pit}). To disable,
@@ -100,48 +104,17 @@ public class DruidAdapterIT {
     };
   }
 
-  /** Similar to {@link CalciteAssert#checkResultUnordered}, but filters strings
-   * before comparing them. */
-  static Function<ResultSet, Void> checkResultUnordered(
-      final String... lines) {
-    return new Function<ResultSet, Void>() {
-      public Void apply(ResultSet resultSet) {
-        try {
-          final List<String> expectedList = Lists.newArrayList(lines);
-          Collections.sort(expectedList);
-
-          final List<String> actualList = Lists.newArrayList();
-          CalciteAssert.toStringList(resultSet, actualList);
-          for (int i = 0; i < actualList.size(); i++) {
-            String s = actualList.get(i);
-            actualList.set(i,
-                s.replaceAll("\\.0;", ";").replaceAll("\\.0$", ""));
-          }
-          Collections.sort(actualList);
-
-          assertThat(actualList, equalTo(expectedList));
-          return null;
-        } catch (SQLException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  /** Creates a query against the {@link #FOODMART} data set. */
-  private CalciteAssert.AssertQuery sql(String sql) {
+  /** Creates a query against a data set given by a map. */
+  private CalciteAssert.AssertQuery sql(String sql, URL url) {
     return CalciteAssert.that()
         .enable(enabled())
-        .with(FOODMART)
+        .with(ImmutableMap.of("model", url.getPath()))
         .query(sql);
   }
 
-  /** Creates a query against the {@link #WIKI} data set. */
-  private CalciteAssert.AssertQuery wiki(String sql) {
-    return CalciteAssert.that()
-        .enable(enabled())
-        .with(WIKI)
-        .query(sql);
+  /** Creates a query against the {@link #FOODMART} data set. */
+  private CalciteAssert.AssertQuery sql(String sql) {
+    return sql(sql, FOODMART);
   }
 
   /** Tests a query against the {@link #WIKI} data set.
@@ -152,8 +125,36 @@ public class DruidAdapterIT {
     final String explain = "PLAN="
         + "EnumerableInterpreter\n"
         + "  DruidQuery(table=[[wiki, wiki]], filter=[=(CAST($12):VARCHAR(13) CHARACTER SET
\"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", 'Jeremy Corbyn')], groups=[{4}], aggs=[[]])\n";
+    checkSelectDistinctWiki(WIKI, "wiki")
+        .explainContains(explain);
+  }
+
+  @Test public void testSelectDistinctWikiNoColumns() {
+    final String explain = "PLAN="
+        + "EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[wiki, wiki]], filter=[=(CAST($18):VARCHAR(13) CHARACTER SET
\"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", 'Jeremy Corbyn')], groups=[{8}], aggs=[[]])\n";
+    checkSelectDistinctWiki(WIKI_AUTO, "wiki")
+        .explainContains(explain);
+  }
+
+  @Test public void testSelectDistinctWikiNoTables() {
+    // Compared to testSelectDistinctWiki, table name is different (because it
+    // is the raw dataSource name from Druid) and the field offsets are
+    // different. This is expected.
+    final String explain = "PLAN="
+        + "EnumerableInterpreter\n"
+        + "  DruidQuery(table=[[wiki, wikiticker]], filter=[=(CAST($17):VARCHAR(13) CHARACTER
SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\", 'Jeremy Corbyn')], groups=[{7}],
aggs=[[]])\n";
+    checkSelectDistinctWiki(WIKI_AUTO2, "wikiticker")
+        .explainContains(explain);
+
+    // Because no tables are declared, foodmart is automatically present.
+    sql("select count(*) as c from \"foodmart\"", WIKI_AUTO2)
+        .returnsUnordered("C=86829");
+  }
+
+  private CalciteAssert.AssertQuery checkSelectDistinctWiki(URL url, String tableName) {
     final String sql = "select distinct \"countryName\"\n"
-        + "from \"wiki\"\n"
+        + "from \"" + tableName + "\"\n"
         + "where \"page\" = 'Jeremy Corbyn'";
     final String druidQuery = "{'queryType':'groupBy',"
         + "'dataSource':'wikiticker','granularity':'all',"
@@ -161,10 +162,9 @@ public class DruidAdapterIT {
         + "'filter':{'type':'selector','dimension':'page','value':'Jeremy Corbyn'},"
         + "'aggregations':[{'type':'longSum','name':'unit_sales','fieldName':'unit_sales'}],"
         + "'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z']}";
-    wiki(sql)
+    return sql(sql, url)
         .returnsUnordered("countryName=United Kingdom",
             "countryName=null")
-        .explainContains(explain)
         .queryContains(druidChecker(druidQuery));
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-foodmart-model.json
----------------------------------------------------------------------
diff --git a/druid/src/test/resources/druid-foodmart-model.json b/druid/src/test/resources/druid-foodmart-model.json
index 68102f0..8e0e14e 100644
--- a/druid/src/test/resources/druid-foodmart-model.json
+++ b/druid/src/test/resources/druid-foodmart-model.json
@@ -23,7 +23,8 @@
       "name": "foodmart",
       "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
       "operand": {
-        "url": "http://localhost:8082/druid/v2/?pretty"
+        "url": "http://localhost:8082",
+        "coordinatorUrl": "http://localhost:8081"
       },
       "tables": [
         {

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-wiki-model.json
----------------------------------------------------------------------
diff --git a/druid/src/test/resources/druid-wiki-model.json b/druid/src/test/resources/druid-wiki-model.json
index eb13494..f345788 100644
--- a/druid/src/test/resources/druid-wiki-model.json
+++ b/druid/src/test/resources/druid-wiki-model.json
@@ -23,7 +23,8 @@
       "name": "wiki",
       "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
       "operand": {
-        "url": "http://localhost:8082/druid/v2/?pretty"
+        "url": "http://localhost:8082",
+        "coordinatorUrl": "http://localhost:8081"
       },
       "tables": [
         {

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-wiki-no-columns-model.json
----------------------------------------------------------------------
diff --git a/druid/src/test/resources/druid-wiki-no-columns-model.json b/druid/src/test/resources/druid-wiki-no-columns-model.json
new file mode 100644
index 0000000..c9231c4
--- /dev/null
+++ b/druid/src/test/resources/druid-wiki-no-columns-model.json
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Druid model that has one table defined, and table has no columns, so adapter
+ * needs to discover columns.
+ */
+{
+  "version": "1.0",
+  "defaultSchema": "wiki",
+  "schemas": [
+    {
+      "type": "custom",
+      "name": "wiki",
+      "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
+      "operand": {
+        "url": "http://localhost:8082",
+        "coordinatorUrl": "http://localhost:8081"
+      },
+      "tables": [
+        {
+          "name": "wiki",
+          "factory": "org.apache.calcite.adapter.druid.DruidTableFactory",
+          "operand": {
+            "dataSource": "wikiticker",
+            "interval": "1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z",
+            "timestampColumn": "time"
+          }
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/druid/src/test/resources/druid-wiki-no-tables-model.json
----------------------------------------------------------------------
diff --git a/druid/src/test/resources/druid-wiki-no-tables-model.json b/druid/src/test/resources/druid-wiki-no-tables-model.json
new file mode 100644
index 0000000..5b22ef9
--- /dev/null
+++ b/druid/src/test/resources/druid-wiki-no-tables-model.json
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Druid model where there are no table definitions.
+ */
+{
+  "version": "1.0",
+  "defaultSchema": "wiki",
+  "schemas": [
+    {
+      "type": "custom",
+      "name": "wiki",
+      "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
+      "operand": {
+        "url": "http://localhost:8082",
+        "coordinatorUrl": "http://localhost:8081"
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/calcite/blob/435e2030/site/_docs/druid_adapter.md
----------------------------------------------------------------------
diff --git a/site/_docs/druid_adapter.md b/site/_docs/druid_adapter.md
index 3a330a6..9de2fb9 100644
--- a/site/_docs/druid_adapter.md
+++ b/site/_docs/druid_adapter.md
@@ -51,7 +51,8 @@ A basic example of a model file is given below:
       "name": "wiki",
       "factory": "org.apache.calcite.adapter.druid.DruidSchemaFactory",
       "operand": {
-        "url": "http://localhost:8082/druid/v2/?pretty"
+        "url": "http://localhost:8082",
+        "coordinatorUrl": "http://localhost:8081"
       },
       "tables": [
         {


Mime
View raw message