drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/7] drill git commit: DRILL-3992: Add/fix support for JDBC schemas (tested against oracle and derby)
Date Mon, 02 Nov 2015 02:56:36 GMT
DRILL-3992: Add/fix support for JDBC schemas (tested against oracle and derby)

This closes #225


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/22e53163
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/22e53163
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/22e53163

Branch: refs/heads/master
Commit: 22e5316317cf5b8caed7a869e40a012f18652254
Parents: 7f55051
Author: Jacques Nadeau <jacques@apache.org>
Authored: Wed Oct 28 17:20:51 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Nov 1 18:53:57 2015 -0800

----------------------------------------------------------------------
 .../exec/store/jdbc/JdbcStoragePlugin.java      | 82 +++++++++++++++++---
 .../drill/exec/store/jdbc/TestJdbcPlugin.java   | 14 ++--
 .../exec/store/ischema/RecordGenerator.java     | 27 ++++---
 3 files changed, 96 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/22e53163/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index f27f6f1..24d1f9d 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -234,6 +234,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
 
   private class CapitalizingJdbcSchema extends AbstractSchema {
 
+    final Map<String, CapitalizingJdbcSchema> schemaMap = Maps.newHashMap();
     private final JdbcSchema inner;
 
     public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name, DataSource
dataSource,
@@ -258,13 +259,21 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
     }
 
     @Override
-    public Schema getSubSchema(String name) {
-      return inner.getSubSchema(name);
+    public CapitalizingJdbcSchema getSubSchema(String name) {
+      return schemaMap.get(name);
+    }
+
+    void setHolder(SchemaPlus plusOfThis) {
+      for (String s : getSubSchemaNames()) {
+        CapitalizingJdbcSchema inner = getSubSchema(s);
+        SchemaPlus holder = plusOfThis.add(s, inner);
+        inner.setHolder(holder);
+      }
     }
 
     @Override
     public Set<String> getSubSchemaNames() {
-      return inner.getSubSchemaNames();
+      return schemaMap.keySet();
     }
 
     @Override
@@ -295,25 +304,74 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
       try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs())
{
         while (set.next()) {
           final String catalogName = set.getString(1);
-          CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName,
source, dialect,
-              convention, catalogName, null);
+          CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(
+              getSchemaPath(), catalogName, source, dialect, convention, catalogName, null);
           schemaMap.put(catalogName, schema);
         }
       } catch (SQLException e) {
         logger.warn("Failure while attempting to load JDBC schema.", e);
       }
 
-      // unable to read general catalog
+      // unable to read catalog list.
       if (schemaMap.isEmpty()) {
-        schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String>
of(), name, source, dialect,
-            convention,
-            null, null));
+
+        // try to add a list of schemas to the schema map.
+        boolean schemasAdded = addSchemas();
+
+        if (!schemasAdded) {
+          // there were no schemas, just create a default one (the jdbc system doesn't support
catalogs/schemas).
+          schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList.<String>
of(), name, source, dialect,
+              convention, null, null));
+        }
+      } else {
+        // We already have catalogs. Add schemas in this context of their catalogs.
+        addSchemas();
       }
 
       defaultSchema = schemaMap.values().iterator().next();
 
+
+    }
+
+    void setHolder(SchemaPlus plusOfThis) {
+      for (String s : getSubSchemaNames()) {
+        CapitalizingJdbcSchema inner = getSubSchema(s);
+        SchemaPlus holder = plusOfThis.add(s, inner);
+        inner.setHolder(holder);
+      }
     }
 
+    private boolean addSchemas() {
+      boolean added = false;
+      try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getSchemas())
{
+        while (set.next()) {
+          final String schemaName = set.getString(1);
+          final String catalogName = set.getString(2);
+
+          CapitalizingJdbcSchema parentSchema = schemaMap.get(catalogName);
+          if (parentSchema == null) {
+            CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), schemaName,
source, dialect,
+                convention, catalogName, schemaName);
+
+            // if a catalog schema doesn't exist, we'll add this at the top level.
+            schemaMap.put(schemaName, schema);
+          } else {
+            CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(parentSchema.getSchemaPath(),
schemaName,
+                source, dialect,
+                convention, catalogName, schemaName);
+            parentSchema.schemaMap.put(schemaName, schema);
+
+          }
+          added = true;
+        }
+      } catch (SQLException e) {
+        logger.warn("Failure while attempting to load JDBC schema.", e);
+      }
+
+      return added;
+    }
+
+
     @Override
     public String getTypeName() {
       return JdbcStorageConfig.NAME;
@@ -325,7 +383,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
     }
 
     @Override
-    public Schema getSubSchema(String name) {
+    public CapitalizingJdbcSchema getSubSchema(String name) {
       return schemaMap.get(name);
     }
 
@@ -358,9 +416,11 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin {
   @Override
   public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
     JdbcCatalogSchema schema = new JdbcCatalogSchema(name);
-    parent.add(name, schema);
+    SchemaPlus holder = parent.add(name, schema);
+    schema.setHolder(holder);
   }
 
+
   @Override
   public JdbcStorageConfig getConfig() {
     return config;

http://git-wip-us.apache.org/repos/asf/drill/blob/22e53163/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
index 1f15068..2eb419c 100644
--- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
+++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java
@@ -88,7 +88,7 @@ public class TestJdbcPlugin extends PlanTestBase {
     // we'll test data except for date, time and timestamps. Derby mangles these due to improper
timezone support.
     testBuilder()
         .sqlQuery(
-            "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM,
SM, BI, BOOL from testdb.`default`.PERSON")
+            "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM,
SM, BI, BOOL from testdb.PERSON")
         .ordered()
         .baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE",
"DBL", "FLT", "REL",
             "NUM", "SM", "BI", "BOOL")
@@ -122,9 +122,9 @@ public class TestJdbcPlugin extends PlanTestBase {
   @Test
   public void pushdownJoinAndFilterPushDown() throws Exception {
     final String query = "select * from \n" +
-        "testdb.`default`.PERSON e\n" +
+        "testdb.PERSON e\n" +
         "INNER JOIN \n" +
-        "testdb.`default`.PERSON s\n" +
+        "testdb.PERSON s\n" +
         "ON e.FirstName = s.FirstName\n" +
         "WHERE e.LastName > 'hello'";
 
@@ -134,7 +134,7 @@ public class TestJdbcPlugin extends PlanTestBase {
   @Test
   public void pushdownAggregation() throws Exception {
     final String query = "select count(*) from \n" +
-        "testdb.`default`.PERSON";
+        "testdb.PERSON";
 
     testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" });
   }
@@ -142,12 +142,12 @@ public class TestJdbcPlugin extends PlanTestBase {
   @Test
   public void pushdownDoubleJoinAndFilter() throws Exception {
     final String query = "select * from \n" +
-        "testdb.`default`.PERSON e\n" +
+        "testdb.PERSON e\n" +
         "INNER JOIN \n" +
-        "testdb.`default`.PERSON s\n" +
+        "testdb.PERSON s\n" +
         "ON e.PersonId = s.PersonId\n" +
         "INNER JOIN \n" +
-        "testdb.`default`.PERSON ed\n" +
+        "testdb.PERSON ed\n" +
         "ON e.PersonId = ed.PersonId\n" +
         "WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'";
     testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" });

http://git-wip-us.apache.org/repos/asf/drill/blob/22e53163/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
index c2a56ca..f464727 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java
@@ -17,26 +17,29 @@
  */
 package org.apache.drill.exec.store.ischema;
 
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
+
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.schema.Schema.TableType;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.*;
 import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
 /**
@@ -180,8 +183,14 @@ public abstract class RecordGenerator {
 
     @Override
     public boolean visitTable(String schemaName, String tableName, Table table) {
-      records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName,
-                                    table.getJdbcTableType().toString()));
+      Preconditions.checkNotNull(table, "Error. Table %s.%s provided is null.", schemaName,
tableName);
+
+      // skip over unknown table types
+      if (table.getJdbcTableType() != null) {
+        records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName,
+            table.getJdbcTableType().toString()));
+      }
+
       return false;
     }
   }


Mime
View raw message