Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 74C8C18C3F for ; Mon, 2 Nov 2015 02:56:36 +0000 (UTC) Received: (qmail 24767 invoked by uid 500); 2 Nov 2015 02:56:36 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 24687 invoked by uid 500); 2 Nov 2015 02:56:36 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 24664 invoked by uid 99); 2 Nov 2015 02:56:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2015 02:56:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DEAADDFD87; Mon, 2 Nov 2015 02:56:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jacques@apache.org To: commits@drill.apache.org Date: Mon, 02 Nov 2015 02:56:36 -0000 Message-Id: In-Reply-To: <035bd4b15eb4483a838acb3075d10d92@git.apache.org> References: <035bd4b15eb4483a838acb3075d10d92@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/7] drill git commit: DRILL-3992: Add/fix support for JDBC schemas (tested against oracle and derby) DRILL-3992: Add/fix support for JDBC schemas (tested against oracle and derby) This closes #225 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/22e53163 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/22e53163 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/22e53163 Branch: refs/heads/master Commit: 22e5316317cf5b8caed7a869e40a012f18652254 Parents: 7f55051 Author: Jacques Nadeau Authored: Wed Oct 28 17:20:51 2015 -0700 Committer: Jacques Nadeau Committed: Sun Nov 1 18:53:57 2015 -0800 ---------------------------------------------------------------------- .../exec/store/jdbc/JdbcStoragePlugin.java | 82 +++++++++++++++++--- .../drill/exec/store/jdbc/TestJdbcPlugin.java | 14 ++-- .../exec/store/ischema/RecordGenerator.java | 27 ++++--- 3 files changed, 96 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/22e53163/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java index f27f6f1..24d1f9d 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java @@ -234,6 +234,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { private class CapitalizingJdbcSchema extends AbstractSchema { + final Map schemaMap = Maps.newHashMap(); private final JdbcSchema inner; public CapitalizingJdbcSchema(List parentSchemaPath, String name, DataSource dataSource, @@ -258,13 +259,21 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { } @Override - public Schema getSubSchema(String name) { - return inner.getSubSchema(name); + public CapitalizingJdbcSchema getSubSchema(String name) { + return schemaMap.get(name); + } + + void setHolder(SchemaPlus plusOfThis) { + for (String s : getSubSchemaNames()) { + CapitalizingJdbcSchema inner = getSubSchema(s); + SchemaPlus holder = plusOfThis.add(s, inner); + inner.setHolder(holder); + } } @Override public Set getSubSchemaNames() { - return inner.getSubSchemaNames(); + return schemaMap.keySet(); } @Override @@ -295,25 +304,74 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getCatalogs()) { while (set.next()) { final String catalogName = set.getString(1); - CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), catalogName, source, dialect, - convention, catalogName, null); + CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema( + getSchemaPath(), catalogName, source, dialect, convention, catalogName, null); schemaMap.put(catalogName, schema); } } catch (SQLException e) { logger.warn("Failure while attempting to load JDBC schema.", e); } - // unable to read general catalog + // unable to read catalog list. if (schemaMap.isEmpty()) { - schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList. of(), name, source, dialect, - convention, - null, null)); + + // try to add a list of schemas to the schema map. + boolean schemasAdded = addSchemas(); + + if (!schemasAdded) { + // there were no schemas, just create a default one (the jdbc system doesn't support catalogs/schemas). + schemaMap.put("default", new CapitalizingJdbcSchema(ImmutableList. of(), name, source, dialect, + convention, null, null)); + } + } else { + // We already have catalogs. Add schemas in this context of their catalogs. + addSchemas(); } defaultSchema = schemaMap.values().iterator().next(); + + } + + void setHolder(SchemaPlus plusOfThis) { + for (String s : getSubSchemaNames()) { + CapitalizingJdbcSchema inner = getSubSchema(s); + SchemaPlus holder = plusOfThis.add(s, inner); + inner.setHolder(holder); + } } + private boolean addSchemas() { + boolean added = false; + try (Connection con = source.getConnection(); ResultSet set = con.getMetaData().getSchemas()) { + while (set.next()) { + final String schemaName = set.getString(1); + final String catalogName = set.getString(2); + + CapitalizingJdbcSchema parentSchema = schemaMap.get(catalogName); + if (parentSchema == null) { + CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(getSchemaPath(), schemaName, source, dialect, + convention, catalogName, schemaName); + + // if a catalog schema doesn't exist, we'll add this at the top level. + schemaMap.put(schemaName, schema); + } else { + CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(parentSchema.getSchemaPath(), schemaName, + source, dialect, + convention, catalogName, schemaName); + parentSchema.schemaMap.put(schemaName, schema); + + } + added = true; + } + } catch (SQLException e) { + logger.warn("Failure while attempting to load JDBC schema.", e); + } + + return added; + } + + @Override public String getTypeName() { return JdbcStorageConfig.NAME; @@ -325,7 +383,7 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { } @Override - public Schema getSubSchema(String name) { + public CapitalizingJdbcSchema getSubSchema(String name) { return schemaMap.get(name); } @@ -358,9 +416,11 @@ public class JdbcStoragePlugin extends AbstractStoragePlugin { @Override public void registerSchemas(SchemaConfig config, SchemaPlus parent) { JdbcCatalogSchema schema = new JdbcCatalogSchema(name); - parent.add(name, schema); + SchemaPlus holder = parent.add(name, schema); + schema.setHolder(holder); } + @Override public JdbcStorageConfig getConfig() { return config; http://git-wip-us.apache.org/repos/asf/drill/blob/22e53163/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java index 1f15068..2eb419c 100644 --- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPlugin.java @@ -88,7 +88,7 @@ public class TestJdbcPlugin extends PlanTestBase { // we'll test data except for date, time and timestamps. Derby mangles these due to improper timezone support. testBuilder() .sqlQuery( - "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.`default`.PERSON") + "select PERSONID, LASTNAME, FIRSTNAME, ADDRESS, CITY, CODE, DBL, FLT, REL, NUM, SM, BI, BOOL from testdb.PERSON") .ordered() .baselineColumns("PERSONID", "LASTNAME", "FIRSTNAME", "ADDRESS", "CITY", "CODE", "DBL", "FLT", "REL", "NUM", "SM", "BI", "BOOL") @@ -122,9 +122,9 @@ public class TestJdbcPlugin extends PlanTestBase { @Test public void pushdownJoinAndFilterPushDown() throws Exception { final String query = "select * from \n" + - "testdb.`default`.PERSON e\n" + + "testdb.PERSON e\n" + "INNER JOIN \n" + - "testdb.`default`.PERSON s\n" + + "testdb.PERSON s\n" + "ON e.FirstName = s.FirstName\n" + "WHERE e.LastName > 'hello'"; @@ -134,7 +134,7 @@ public class TestJdbcPlugin extends PlanTestBase { @Test public void pushdownAggregation() throws Exception { final String query = "select count(*) from \n" + - "testdb.`default`.PERSON"; + "testdb.PERSON"; testPlanMatchingPatterns(query, new String[] {}, new String[] { "Aggregate" }); } @@ -142,12 +142,12 @@ public class TestJdbcPlugin extends PlanTestBase { @Test public void pushdownDoubleJoinAndFilter() throws Exception { final String query = "select * from \n" + - "testdb.`default`.PERSON e\n" + + "testdb.PERSON e\n" + "INNER JOIN \n" + - "testdb.`default`.PERSON s\n" + + "testdb.PERSON s\n" + "ON e.PersonId = s.PersonId\n" + "INNER JOIN \n" + - "testdb.`default`.PERSON ed\n" + + "testdb.PERSON ed\n" + "ON e.PersonId = ed.PersonId\n" + "WHERE s.FirstName > 'abc' and ed.FirstName > 'efg'"; testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" }); http://git-wip-us.apache.org/repos/asf/drill/blob/22e53163/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java index c2a56ca..f464727 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordGenerator.java @@ -17,26 +17,29 @@ */ package org.apache.drill.exec.store.ischema; +import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME; +import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME; +import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME; +import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA; + import java.util.List; import java.util.Map; -import com.google.common.collect.ImmutableMap; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.Schema.TableType; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; - -import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.*; import org.apache.drill.exec.planner.logical.DrillViewInfoProvider; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result; import org.apache.drill.exec.store.pojo.PojoRecordReader; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; - +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; /** @@ -180,8 +183,14 @@ public abstract class RecordGenerator { @Override public boolean visitTable(String schemaName, String tableName, Table table) { - records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName, - table.getJdbcTableType().toString())); + Preconditions.checkNotNull(table, "Error. Table %s.%s provided is null.", schemaName, tableName); + + // skip over unknown table types + if (table.getJdbcTableType() != null) { + records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName, + table.getJdbcTableType().toString())); + } + return false; } }