Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 79A4B17C45 for ; Mon, 14 Sep 2015 05:28:48 +0000 (UTC) Received: (qmail 42678 invoked by uid 500); 14 Sep 2015 05:28:48 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 42588 invoked by uid 500); 14 Sep 2015 05:28:48 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 42505 invoked by uid 99); 14 Sep 2015 05:28:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Sep 2015 05:28:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24A65DFBD7; Mon, 14 Sep 2015 05:28:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jacques@apache.org To: commits@drill.apache.org Date: Mon, 14 Sep 2015 05:28:49 -0000 Message-Id: In-Reply-To: <97828664790943bca64e0c71625a2251@git.apache.org> References: <97828664790943bca64e0c71625a2251@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/15] drill git commit: DRILL-3180: Initial JDBC plugin implementation. DRILL-3180: Initial JDBC plugin implementation. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8478e9fb Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8478e9fb Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8478e9fb Branch: refs/heads/master Commit: 8478e9fb1d7e6881d8f092ae9ff3a338f2e023a6 Parents: b525692 Author: MPierre Authored: Sat Aug 1 18:07:18 2015 -0700 Committer: Jacques Nadeau Committed: Sun Sep 13 17:02:18 2015 -0700 ---------------------------------------------------------------------- contrib/storage-mpjdbc/pom.xml | 79 ++++ .../exec/store/mpjdbc/MPJdbcBatchCreator.java | 54 +++ .../drill/exec/store/mpjdbc/MPJdbcClient.java | 300 ++++++++++++ .../exec/store/mpjdbc/MPJdbcClientOptions.java | 52 ++ .../exec/store/mpjdbc/MPJdbcCnxnManager.java | 69 +++ .../exec/store/mpjdbc/MPJdbcFilterBuilder.java | 235 +++++++++ .../exec/store/mpjdbc/MPJdbcFilterRule.java | 60 +++ .../exec/store/mpjdbc/MPJdbcFormatConfig.java | 109 +++++ .../exec/store/mpjdbc/MPJdbcFormatPlugin.java | 170 +++++++ .../exec/store/mpjdbc/MPJdbcGroupScan.java | 181 +++++++ .../exec/store/mpjdbc/MPJdbcRecordReader.java | 471 +++++++++++++++++++ .../drill/exec/store/mpjdbc/MPJdbcScanSpec.java | 76 +++ .../exec/store/mpjdbc/MPJdbcSchemaConfig.java | 80 ++++ .../exec/store/mpjdbc/MPJdbcSchemaFilter.java | 23 + .../exec/store/mpjdbc/MPJdbcSchemaSubScan.java | 55 +++ .../drill/exec/store/mpjdbc/MPJdbcSubScan.java | 119 +++++ .../resources/bootstrap-storage-plugins.json | 12 + .../src/main/resources/checkstyle-config.xml | 41 ++ .../main/resources/checkstyle-suppressions.xml | 19 + .../src/main/resources/drill-module.conf | 30 ++ 20 files changed, 2235 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/pom.xml b/contrib/storage-mpjdbc/pom.xml new file mode 100755 index 0000000..5e9afca --- /dev/null +++ b/contrib/storage-mpjdbc/pom.xml @@ -0,0 +1,79 @@ + + + + 4.0.0 + + drill-contrib-parent + org.apache.drill.contrib + 0.9.0-SNAPSHOT + + + drill-mpjdbc-storage + + contrib/mpjdbc-storage-plugin + + + + org.apache.drill.exec + drill-java-exec + ${project.version} + + + + + + org.apache.drill.exec + drill-java-exec + tests + ${project.version} + test + + + org.apache.drill + drill-common + tests + ${project.version} + test + + + com.yammer.metrics + metrics-core + 2.1.1 + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + logback.log.dir + ${project.build.directory}/surefire-reports + + + + + + + + http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java new file mode 100644 index 0000000..af08b2e --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import java.util.Collections; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +//import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.mpjdbc.MPJdbcSchemaSubScan; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class MPJdbcBatchCreator implements BatchCreator { + @Override + public RecordBatch getBatch(FragmentContext context, MPJdbcSubScan config, + List children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + List readers = Lists.newArrayList(); + List columns = null; + try { + if ((columns = config.getColumns()) == null) { + columns = GroupScan.ALL_COLUMNS; + } + readers.add(new MPJdbcRecordReader(context,config)); + } catch (Exception e1) { + throw new ExecutionSetupException(e1); + } + return new ScanBatch(config, context, readers.iterator()); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java new file mode 100644 index 0000000..f3bf81d --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Schema.TableType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.linq4j.Extensions; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.store.AbstractSchema; + +import com.google.common.collect.ImmutableList; + +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; + +class MPJdbcClient { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(MPJdbcClient.class); + + private MPJdbcClientOptions clientOptions; + private Connection conn; + private DatabaseMetaData metadata; + private String uri; + private OdbcSchema defaultSchema; + private MPJdbcFormatPlugin plugin; + private String plugName; + + public MPJdbcClient(String uri, MPJdbcClientOptions clientOptions, + MPJdbcFormatPlugin plugin) { + try { + Class.forName(clientOptions.getDriver()).newInstance(); + this.clientOptions = clientOptions; + + String user = this.clientOptions.getUser(); + String passwd = this.clientOptions.getPassword(); + this.plugin = plugin; + this.uri = uri; + + if (user == null || user.length() == 0 || passwd.length() == 0) { + logger.info("username, password assumed to be in the uri"); + this.conn = DriverManager.getConnection(uri); + } else { + this.conn = DriverManager.getConnection(uri, user, passwd); + } + this.metadata = this.conn.getMetaData(); + this.plugName = plugin.getName(); + } catch (InstantiationException e) { + // TODO Auto-generated catch block + new DrillRuntimeException(e); + } catch (IllegalAccessException e) { + // TODO Auto-generated catch block + new DrillRuntimeException(e); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + new DrillRuntimeException(e); + } catch (SQLException e) { + new DrillRuntimeException(e); + } + } + + public Connection getConnection() { + return this.conn; + } + + public Map getSchemas() { + Map lst = new HashMap(); + try { + ResultSet rs = this.metadata.getCatalogs(); + while (rs.next()) { + Integer val = lst.get(rs.getString(1)); + if (val == null) { + lst.put(rs.getString(1), new Integer(1)); + } + } + + } catch (SQLException e) { + new DrillRuntimeException(e); + } + return lst; + } + + public Set getTables(String catalog) { + Set lst = new HashSet(); + + String[] typeList = { "TABLE", "VIEW" }; + try { + ResultSet rs = this.metadata + .getTables(catalog,null, null, null); + while (rs.next()) { + if (rs.getString(1) != null) { + //lst.add(rs.getString(1) + "." + rs.getString("TABLE_NAME")); + lst.add(rs.getString("TABLE_NAME")); + } else { + lst.add(rs.getString("TABLE_NAME")); + } + } + + } catch (SQLException e) { + throw new DrillRuntimeException(e); + } + return lst; + } + + public List getDatabases() { + List lst = new ArrayList(); + try { + ResultSet rs = this.metadata.getCatalogs(); + while (rs.next()) { + lst.add(rs.getString(0)); + } + } catch (SQLException e) { + e.printStackTrace(); + } + return lst; + } + + public void close() { + // TODO Auto-generated method stub + try { + this.conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + + } + + public OdbcSchema getSchema() { + List l = new ArrayList(); + String currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions, + plugin).getCurrentSchema(); + defaultSchema = new OdbcSchema(currentSchema); + return defaultSchema; + } + + public OdbcSchema getSchema(String name) { + List l = new ArrayList(); + OdbcSchema schema = new OdbcSchema(name); + return schema; + } + + public class OdbcSchema extends AbstractSchema { + + private Map sub_schemas; + private String currentSchema; + private Set tables; + + public OdbcSchema(String name) { + super(ImmutableList. of(), name); + /*currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions, + plugin).getCurrentSchema(); + if (currentSchema == null) { + currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions, + plugin).getCurrentSchema(); + } + */ + if(name.equals("")) { + sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin) + .getSchemas(); + } + tables = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin) + .getTables(name); + } + + public OdbcSchema(List parentSchemaPath, String name) { + super(parentSchemaPath, name); + currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions, + plugin).getCurrentSchema(); + if (currentSchema == null) { + currentSchema = "ROOT"; + } + sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin) + .getSchemas(); + // TODO Auto-generated constructor stub + } + + @Override + public String getTypeName() { + // TODO Auto-generated method stub + return "odbc"; + } + + @Override + public AbstractSchema getSubSchema(String name) { + if (sub_schemas == null) { + sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions, + plugin).getSchemas(); + } + Integer a = sub_schemas.get(name); + if (a == 1) { + return new OdbcSchema(name); + } + return null; + } + + @Override + public Table getTable(String name) { + // TODO Auto-generated method stub + String tableName = null; + if(name.contains(".")) { + String[] val = name.split("\\."); + OdbcSchema sub = (OdbcSchema) this.getSubSchema(val[0]); + return sub.getTable(val[1]); + } + Iterator iter = tables.iterator(); + while(iter.hasNext()) { + tableName = iter.next(); + if(tableName.equalsIgnoreCase(name)) { + break; + } + else { + tableName = null; + } + } + if(tableName == null) { + return null; + } + MPJdbcScanSpec spec = new MPJdbcScanSpec(this.name, tableName,""); + return new DynamicDrillTable(plugin, "odbc", spec); + } + + @Override + public Set getTableNames() { + // TODO Auto-generated method stub + Set Tables = MPJdbcCnxnManager.getClient(uri, clientOptions, + plugin).getTables(name); + return Tables; + } + + @Override + public Set getSubSchemaNames() { + // TODO Auto-generated method stub + sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin) + .getSchemas(); + return sub_schemas.keySet(); + } + + @Override + public Collection getFunctions(String name) { + // TODO Auto-generated method stub + return super.getFunctions(name); + } + + @Override + public AbstractSchema getDefaultSchema() { + return MPJdbcCnxnManager.getClient(uri, clientOptions, plugin) + .getDefaultSchema(); + } + + } + + public String getCurrentSchema() { + // TODO Auto-generated method stub + try { + return this.conn.getCatalog(); + } catch (SQLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + } + + public AbstractSchema getDefaultSchema() { + // TODO Auto-generated method stub + return defaultSchema; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java new file mode 100644 index 0000000..84b6348 --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +public class MPJdbcClientOptions { + private String driver; + private String user; + private String passwd; + + public MPJdbcClientOptions(String driver, String user, String passwd) { + this.driver = driver; + this.user = user; + this.passwd = passwd; + } + + public MPJdbcClientOptions(MPJdbcFormatConfig storageConfig) { + this.driver = storageConfig.getDriver(); + this.user = storageConfig.getUser(); + this.passwd = storageConfig.getPasswd(); + } + + public String getDriver() { + // TODO Auto-generated method stub + return this.driver; + } + + public String getUser() { + // TODO Auto-generated method stub + return this.user; + } + + public String getPassword() { + // TODO Auto-generated method stub + return this.passwd; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java new file mode 100644 index 0000000..7e43c32 --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +import org.apache.drill.exec.store.mpjdbc.MPJdbcClientOptions; + +public class MPJdbcCnxnManager { + + private static final Logger logger = LoggerFactory + .getLogger(MPJdbcCnxnManager.class); + private static Cache uriClientMap; + + static { + uriClientMap = CacheBuilder.newBuilder().maximumSize(5) + .expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener(new uriCloser()).build(); + } + + private static class uriCloser implements RemovalListener { + + @Override + public synchronized void onRemoval( + RemovalNotification removal) { + removal.getValue().close(); + logger.debug("Closed connection to {}.", removal.getKey().toString()); + } + + } + + public synchronized static MPJdbcClient getClient(String uri, + MPJdbcClientOptions clientOptions, MPJdbcFormatPlugin plugin) { + MPJdbcClient client = uriClientMap.getIfPresent(uri); + if (client == null) { + client = new MPJdbcClient(uri, clientOptions,plugin); + if (client.getConnection() != null) { + uriClientMap.put(uri, client); + } else { + return null; + } + } + + return client; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java new file mode 100644 index 0000000..488a55d --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.mpjdbc; + +import java.util.List; + +import org.apache.drill.common.expression.BooleanOperator; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.exec.store.mpjdbc.MPJdbcScanSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; + +public class MPJdbcFilterBuilder extends + AbstractExprVisitor { + static final Logger logger = LoggerFactory + .getLogger(MPJdbcFilterBuilder.class); + final MPJdbcGroupScan groupScan; + final LogicalExpression le; + private boolean allExpressionsConverted = true; + + public MPJdbcFilterBuilder(MPJdbcGroupScan groupScan, + LogicalExpression conditionExp) { + this.groupScan = groupScan; + this.le = conditionExp; + } + + public MPJdbcScanSpec parseTree() { + MPJdbcScanSpec parsedSpec = le.accept(this, null); + if (parsedSpec != null) { + parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getScanSpec(), + parsedSpec); + } + return parsedSpec; + } + + private MPJdbcScanSpec mergeScanSpecs(String functionName, + MPJdbcScanSpec leftScanSpec, MPJdbcScanSpec rightScanSpec) { + List newFilter; + switch (functionName) { + case "booleanAnd": + if (leftScanSpec.getFilters() != null + && rightScanSpec.getFilters() != null) { + /* newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(), + rightScanSpec.getFilters()); */ + } else if (leftScanSpec.getFilters() != null) { + newFilter = leftScanSpec.getFilters(); + } else { + newFilter = rightScanSpec.getFilters(); + } + break; + case "booleanOr": + /* newFilter = OdbcUtils.orFilterAtIndex(leftScanSpec.getFilters(), + rightScanSpec.getFilters()); */ + } + MPJdbcScanSpec mp = new MPJdbcScanSpec(groupScan.getScanSpec().getDatabase(), groupScan + .getScanSpec().getTable(), groupScan.getScanSpec().getColumns()); + return mp; + } + + public boolean isAllExpressionsConverted() { + return allExpressionsConverted; + } + + @Override + public MPJdbcScanSpec visitUnknown(LogicalExpression e, Void value) + throws RuntimeException { + allExpressionsConverted = false; + return null; + } + + @Override + public MPJdbcScanSpec visitBooleanOperator(BooleanOperator op, Void value) { + List args = op.args; + MPJdbcScanSpec nodeScanSpec = null; + String functionName = op.getName(); + for (int i = 0; i < args.size(); ++i) { + switch (functionName) { + case "booleanAnd": + case "booleanOr": + if (nodeScanSpec == null) { + nodeScanSpec = args.get(i).accept(this, null); + } else { + MPJdbcScanSpec scanSpec = args.get(i).accept(this, null); + if (scanSpec != null) { + nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec); + } else { + allExpressionsConverted = false; + } + } + break; + } + } + return nodeScanSpec; + } + + @Override + public MPJdbcScanSpec visitFunctionCall(FunctionCall call, Void value) + throws RuntimeException { + MPJdbcScanSpec nodeScanSpec = null; + String functionName = call.getName(); + ImmutableList args = call.args; + LogicalExpression nameVal = call.args.get(0); + LogicalExpression valueVal = null; + StringBuilder strBuilder = new StringBuilder(); + if(call.args.size() >= 2) { + valueVal = call.args.get(1); + } + logger.info("Name Val:" + nameVal.toString()); + logger.info("Value Val:" + valueVal.toString()); + + switch(functionName) { + case "equal": + break; + default: + break; + } + /* + if (OdbcCompareFunctionProcessor.isCompareFunction(functionName)) { + OdbcCompareFunctionProcessor processor = OdbcCompareFunctionProcessor + .process(call); + if (processor.isSuccess()) { + try { + nodeScanSpec = createOdbcScanSpec(processor.getFunctionName(), + processor.getPath(), processor.getValue()); + } catch (Exception e) { + logger.error(" Failed to creare Filter ", e); + // throw new RuntimeException(e.getMessage(), e); + } + } + } else { + switch (functionName) { + case "booleanAnd": + case "booleanOr": + MPJdbcScanSpec leftScanSpec = args.get(0).accept(this, null); + MPJdbcScanSpec rightScanSpec = args.get(1).accept(this, null); + if (leftScanSpec != null && rightScanSpec != null) { + nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec, + rightScanSpec); + } else { + allExpressionsConverted = false; + if ("booleanAnd".equals(functionName)) { + nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec; + } + } + break; + } + } + */ + if (nodeScanSpec == null) { + allExpressionsConverted = false; + } + + return nodeScanSpec; + } + + private MPJdbcScanSpec createOdbcScanSpec(String functionName, + SchemaPath field, Object fieldValue) throws ClassNotFoundException, + Exception { + // extract the field name + String fieldName = field.getAsUnescapedPath(); + /* + OdbcCompareOp compareOp = null; + switch (functionName) { + case "equal": + compareOp = OdbcCompareOp.EQUAL; + break; + case "not_equal": + compareOp = OdbcCompareOp.NOT_EQUAL; + break; + case "greater_than_or_equal_to": + compareOp = OdbcCompareOp.GREATER_OR_EQUAL; + break; + case "greater_than": + compareOp = OdbcCompareOp.GREATER; + break; + case "less_than_or_equal_to": + compareOp = OdbcCompareOp.LESS_OR_EQUAL; + break; + case "less_than": + compareOp = OdbcCompareOp.LESS; + break; + case "isnull": + case "isNull": + case "is null": + compareOp = OdbcCompareOp.IFNULL; + break; + case "isnotnull": + case "isNotNull": + case "is not null": + compareOp = OdbcCompareOp.IFNOTNULL; + break; + } + + if (compareOp != null) { + BasicDBObject queryFilter = new BasicDBObject(); + if (compareOp == OdbcCompareOp.IFNULL) { + queryFilter.put(fieldName, + new BasicDBObject(OdbcCompareOp.EQUAL.getCompareOp(), null)); + } else if (compareOp == OdbcCompareOp.IFNOTNULL) { + queryFilter.put(fieldName, + new BasicDBObject(OdbcCompareOp.NOT_EQUAL.getCompareOp(), null)); + } else { + queryFilter.put(fieldName, new BasicDBObject(compareOp.getCompareOp(), + fieldValue)); + } + return new MPJdbcScanSpec(groupScan.getScanSpec().getDbName(), groupScan + .getScanSpec().getCollectionName(), queryFilter); + } + */ + return null; + } + +} + http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java new file mode 100644 index 0000000..c0564d0 --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import java.util.Iterator; +import java.util.List; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.FilterPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; + +public class MPJdbcFilterRule extends StoragePluginOptimizerRule { + public static final StoragePluginOptimizerRule INSTANCE = new MPJdbcFilterRule(); + public MPJdbcFilterRule(RelOptRuleOperand operand, String description) { + super(operand, description); + // TODO Auto-generated constructor stub + } + + public MPJdbcFilterRule() { + // TODO Auto-generated constructor stub + super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MPJdbcFilterRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + // TODO Auto-generated method stub + final ScanPrel scan = (ScanPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + final RexNode condition = filter.getCondition(); + final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(), scan, condition); + MPJdbcGroupScan grpScan= (MPJdbcGroupScan) scan.getGroupScan(); + MPJdbcFilterBuilder builder = new MPJdbcFilterBuilder(grpScan,conditionExp); + MPJdbcScanSpec result = builder.parseTree(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java new file mode 100644 index 0000000..8edce3e --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import org.apache.drill.common.logical.StoragePluginConfig; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +@JsonTypeName(MPJdbcFormatConfig.NAME) +public class MPJdbcFormatConfig extends StoragePluginConfig { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(MPJdbcFormatConfig.class); + public static final String NAME = "jdbc"; + + @JsonIgnore + private String driver; + @JsonIgnore + private String uri; + @JsonIgnore + private String username; + @JsonIgnore + private String password; + + @JsonCreator + public MPJdbcFormatConfig(@JsonProperty("driver") String driver, + @JsonProperty("uri") String uri, + @JsonProperty("username") String username, + @JsonProperty("password") String password) { + this.driver = driver == null ? "" : driver; + this.uri = uri == null ? "jdbc://" : uri; + this.username = username == null ? "" : username; + this.password = password == null ? "" : password; + + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MPJdbcFormatConfig that = (MPJdbcFormatConfig) o; + + if (uri != null ? !uri.equals(that.uri) : that.uri != null) { + return false; + } + + return true; + } + + @JsonProperty("driver") + public String getDriver() { + return this.driver; + } + + @JsonProperty("uri") + public String getUri() { + return this.uri; + } + + @JsonProperty("username") + public String getUser() { + return this.username; + } + + @JsonProperty("password") + public String getPasswd() { + return this.password; + } + + @Override + public int hashCode() { + ObjectMapper mapper = new ObjectMapper(); + try { + String outval = mapper.writeValueAsString(this); + logger.info("FormatConfigHashCode:" + outval); + + return outval.hashCode(); + } catch (JsonProcessingException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return uri.hashCode(); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java new file mode 100644 index 0000000..5a0dd4b --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.Function; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Schema.TableType; +import org.apache.calcite.schema.Function; +import org.apache.calcite.linq4j.Extensions; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.mock.MockGroupScanPOP; +import org.apache.drill.exec.store.mock.MockStorageEngine; +import org.apache.drill.exec.store.mock.MockStorageEngineConfig; +import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry; +import org.apache.drill.exec.store.mpjdbc.MPJdbcClient.OdbcSchema; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class MPJdbcFormatPlugin extends AbstractStoragePlugin { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(MPJdbcFormatPlugin.class); + + private final MPJdbcFormatConfig storageConfig; + protected String name = "odbc"; + private final DrillbitContext context; + + public MPJdbcFormatPlugin(MPJdbcFormatConfig storageConfig, + DrillbitContext context, String name) { + this.context = context; + this.storageConfig = storageConfig; + ObjectMapper mapper = new ObjectMapper(); + try { + String result = mapper.writeValueAsString(storageConfig); + logger.info(result); + } catch (JsonProcessingException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + this.name = name; + } + + @Override + public void registerSchemas(SchemaConfig config, SchemaPlus parent) { + if(storageConfig == null) { + logger.info("StorageConfig is null"); + } + MPJdbcClientOptions options = new MPJdbcClientOptions(storageConfig); + MPJdbcClient client = MPJdbcCnxnManager.getClient(storageConfig.getUri(), + options,this); + Connection conn = (client == null) ? null : client.getConnection(); + Map schemas; + if(client == null) { + logger.info("Could not create client..."); + } + OdbcSchema o = client.getSchema(); + SchemaPlus tl = parent.add(this.name, o); + try { + schemas = client.getSchemas(); + Set> a = schemas.entrySet(); + Iterator> aiter = a.iterator(); + while (aiter.hasNext()) { + Entry val = aiter.next(); + String catalog = val.getKey(); + OdbcSchema sc = client.getSchema(catalog); + tl.add(catalog, sc); + } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public MPJdbcFormatConfig getConfig() { + logger.info("MPJdbcFormatPlugin:getConfig called"); + logger.info(storageConfig.toString()); + return storageConfig; + } + + public DrillbitContext getContext() { + return this.context; + } + + public String getName() { + return this.name; + } + + @Override + public boolean supportsRead() { + return true; + } +/* + @Override + public AbstractGroupScan getPhysicalScan(String userName,JSONOptions selection) + throws IOException { + MPJdbcScanSpec odbcScanSpec = selection.getListWith(new ObjectMapper(), + new TypeReference() { + }); + return new MPJdbcGroupScan(userName,this, odbcScanSpec, null); + } + */ + @Override + public AbstractGroupScan getPhysicalScan(String userName,JSONOptions selection,List columns) + throws IOException { + MPJdbcScanSpec mPJdbcScanSpec = selection.getListWith(new ObjectMapper(), + new TypeReference() { + }); + return new MPJdbcGroupScan(userName,this, mPJdbcScanSpec, columns); + } + + @Override + public Set getOptimizerRules() { + // TODO Auto-generated method stub + return ImmutableSet.of(MPJdbcFilterRule.INSTANCE); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java new file mode 100644 index 0000000..a16f8c8 --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.schedule.CompleteFileWork; +import org.apache.drill.common.expression.SchemaPath; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Preconditions; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class MPJdbcGroupScan extends AbstractGroupScan { + + private MPJdbcFormatPlugin plugin; + private MPJdbcFormatConfig pluginConfig; + private MPJdbcScanSpec mPJdbcScanSpec; + private List columns; + private String userName; + private Map> endpointFragmentMapping; + + public MPJdbcGroupScan(String userName,MPJdbcFormatPlugin storagePlugin, MPJdbcScanSpec scanSpec, + List columns) { + super(userName); + this.plugin = storagePlugin; + this.pluginConfig = storagePlugin.getConfig(); + this.mPJdbcScanSpec = scanSpec; + this.userName = userName; + this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS + : columns; + } + + public MPJdbcGroupScan(MPJdbcGroupScan that) { + super(that); + this.columns = that.columns; + this.plugin = that.plugin; + this.endpointFragmentMapping = that.endpointFragmentMapping; + this.pluginConfig = that.pluginConfig; + this.mPJdbcScanSpec = that.mPJdbcScanSpec; + } + + @Override + public SubScan getSpecificScan(int minorFragmentId) + throws ExecutionSetupException { + // TODO Auto-generated method stub + return new MPJdbcSubScan(plugin,userName, pluginConfig, + endpointFragmentMapping.get(minorFragmentId), columns); + } + + @Override + public int getMaxParallelizationWidth() { + // TODO Auto-generated method stub + return -1; + } + + @Override + public String getDigest() { + // TODO Auto-generated method stub + return toString(); + } + + @Override + public GroupScan clone(List columns) { + MPJdbcGroupScan newScan = new MPJdbcGroupScan(userName,plugin, mPJdbcScanSpec, columns); + return newScan; + + } + + @Override + public ScanStats getScanStats() { + // TODO Auto-generated method stub + return ScanStats.TRIVIAL_TABLE; + } + + @Override + public PhysicalOperator getNewWithChildren(List children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + return new MPJdbcGroupScan(this); + // TODO Auto-generated method stub + } + @Override + @JsonIgnore + public boolean canPushdownProjects(List columns) { + this.columns = columns; + return true; + } + + @Override + public List getOperatorAffinity() { + Map endpointMap = new HashMap(); + for (DrillbitEndpoint ep : plugin.getContext().getBits()) { + endpointMap.put(ep.getAddress(), ep); + } + + Map affinityMap = new HashMap(); + DrillbitEndpoint ep = endpointMap.get(plugin.getConfig().getUri()); + if (ep != null) { + EndpointAffinity affinity = affinityMap.get(ep); + if (affinity == null) { + affinityMap.put(ep, new EndpointAffinity(ep, 1)); + } else { + affinity.addAffinity(1); + } + } + return Lists.newArrayList(affinityMap.values()); + } + + @Override + public void applyAssignments(List incomingEndpoints) { + final int numSlots = incomingEndpoints.size(); + int totalAssignmentsTobeDone = 1; + Preconditions.checkArgument(numSlots <= totalAssignmentsTobeDone, String + .format("Incoming endpoints %d is greater than number of chunks %d", + numSlots, totalAssignmentsTobeDone)); + final int minPerEndpointSlot = (int) Math + .floor((double) totalAssignmentsTobeDone / numSlots); + final int maxPerEndpointSlot = (int) Math + .ceil((double) totalAssignmentsTobeDone / numSlots); + /* Map for (index,endpoint)'s */ + endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots); + /* Reverse mapping for above indexes */ + Map> endpointHostIndexListMap = Maps.newHashMap(); + /* + * Initialize these two maps + */ + for (int i = 0; i < numSlots; ++i) { + List val = new ArrayList(maxPerEndpointSlot); + val.add(this.mPJdbcScanSpec); + endpointFragmentMapping.put(i, val); + String hostname = incomingEndpoints.get(i).getAddress(); + Queue hostIndexQueue = endpointHostIndexListMap.get(hostname); + if (hostIndexQueue == null) { + hostIndexQueue = Lists.newLinkedList(); + endpointHostIndexListMap.put(hostname, hostIndexQueue); + } + hostIndexQueue.add(i); + } + } + + public MPJdbcScanSpec getScanSpec() { + return this.mPJdbcScanSpec; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java new file mode 100644 index 0000000..498b7fd --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java @@ -0,0 +1,471 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.DateHolder; +import org.apache.drill.exec.expr.holders.Decimal38DenseHolder; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.MaterializedField.Key; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.DateVector; +import org.apache.drill.exec.vector.Decimal38DenseVector; +import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableDateVector; +import org.apache.drill.exec.vector.NullableDecimal38DenseVector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableTimeVector; +import org.apache.drill.exec.vector.NullableVar16CharVector.Mutator; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.TimeStampVector; +import org.apache.drill.exec.vector.TimeVector; +import org.apache.drill.exec.vector.ValueHolderHelper; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSetMetaData; +import java.sql.Statement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class MPJdbcRecordReader extends AbstractRecordReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(MPJdbcRecordReader.class); + + private ResultSet rec; + private VectorContainerWriter writer; + private FragmentContext fc; + private MPJdbcSubScan scanSpec; + private MPJdbcFormatPlugin plugin; + private List scanList; + private MPJdbcFormatConfig config; + private Connection conn; + private Statement statement; + private String table; + private String database; + protected List vectors = Lists.newArrayList(); + private int col_cnt = 0; + private MajorType.Builder t; + private OutputMutator outputMutator; + private ResultSetMetaData meta; + private OperatorContext operatorContext; + private String columns; + private List filters; + + public MPJdbcRecordReader(FragmentContext fragmentContext, MPJdbcSubScan scan) { + fc = fragmentContext; + scanSpec = scan; + // TODO Auto-generated constructor stub + this.plugin = scanSpec.getPlugin(); + this.scanList = scanSpec.getScanList(); + this.config = scanSpec.getConfig(); + MPJdbcClientOptions options = new MPJdbcClientOptions(config); + MPJdbcClient client = MPJdbcCnxnManager.getClient(config.getUri(), options, + this.plugin); + conn = client.getConnection(); + Iterator iter = scanList.iterator(); + while (iter.hasNext()) { + MPJdbcScanSpec o = iter.next(); + table = o.getTable(); + database = o.getDatabase(); + List ColList = scan.getColumns(); + Iterator collist_iter = ColList.iterator(); + StringBuilder b = new StringBuilder(); + while(collist_iter.hasNext()) { + SchemaPath val = collist_iter.next(); + b.append(val.getAsUnescapedPath().trim()); + if(collist_iter.hasNext()) { + b.append(","); + } + } + columns = b.toString(); + filters = o.getFilters(); + } + try { + statement = conn.createStatement(); + rec = statement.executeQuery("SELECT " + this.columns + " FROM " + database.trim() + "." + table.trim()); + } catch (SQLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + try { + meta = rec.getMetaData(); + col_cnt = meta.getColumnCount(); + Class cls = null; + for (int i = 1; i <= col_cnt; i++) { + String column_label = meta.getColumnLabel(i); + int types = meta.getColumnType(i); + int isnullable = meta.isNullable(i); + int width = meta.getPrecision(i); + int scale = meta.getScale(i); + MaterializedField field = null; + switch (types) { + case java.sql.Types.BIGINT: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.BIGINT); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableBigIntVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, BigIntVector.class)); + } + break; + case Types.DATE: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.DATE); + cls = org.apache.drill.exec.vector.DateVector.class; + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableDateVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, DateVector.class)); + } + break; + case Types.DECIMAL: + t = MajorType.newBuilder().setMinorType( + TypeProtos.MinorType.DECIMAL38DENSE); + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, Decimal38DenseVector.class)); + break; + case Types.DOUBLE: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.BIGINT); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableBigIntVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, BigIntVector.class)); + } + break; + case Types.FLOAT: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FLOAT8); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableFloat8Vector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, Float8Vector.class)); + } + break; + case Types.INTEGER: + case Types.SMALLINT: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.INT); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableIntVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, IntVector.class)); + } + break; + case Types.LONGNVARCHAR: + case Types.LONGVARCHAR: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR); + t.setWidth(width); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableVarCharVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, VarCharVector.class)); + } + break; + case Types.LONGVARBINARY: + break; + case Types.CHAR: + case Types.NCHAR: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR); + t.setWidth(width); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableVarCharVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, VarCharVector.class)); + } + break; + case Types.NUMERIC: + t = MajorType.newBuilder().setMinorType( + TypeProtos.MinorType.DECIMAL38DENSE); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, + NullableDecimal38DenseVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, Decimal38DenseVector.class)); + } + break; + case Types.NVARCHAR: + case Types.VARCHAR: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR); + t.setWidth(width); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableVarCharVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, VarCharVector.class)); + } + break; + case Types.TIME: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.TIME); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableTimeVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, TimeVector.class)); + } + break; + case Types.TIMESTAMP: + t = MajorType.newBuilder().setMinorType( + TypeProtos.MinorType.TIMESTAMP); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableTimeStampVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, TimeStampVector.class)); + } + break; + default: + t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR); + t.setWidth(width); + if (isnullable == 1) { + t.setMode(DataMode.OPTIONAL); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, NullableVarCharVector.class)); + } else { + t.setMode(DataMode.REQUIRED); + field = MaterializedField.create(column_label, t.build()); + vectors.add(output.addField(field, VarCharVector.class)); + } + break; + } + } + this.outputMutator = output; + + } catch (SQLException | SchemaChangeException e) { + // TODO Auto-generated catch block + throw new ExecutionSetupException(e); + } + } + + @Override + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + + @Override + public int next() { + // TODO Auto-generated method stub + int counter = 0; + int pos = 1; + int prec = 0; + Boolean b = true; + try { + while (counter < 65536 && b == true) { + b = rec.next(); + if(b == false) { + break; + } + for (ValueVector vv : vectors) { + String val = rec.getString(pos); + byte[] record = val.getBytes(Charsets.UTF_8); + if (vv.getClass().equals(NullableVarCharVector.class)) { + NullableVarCharVector v = (NullableVarCharVector) vv; + v.getMutator().setSafe(counter, record, 0, record.length); + v.getMutator().setValueLengthSafe(counter, record.length); + } else if (vv.getClass().equals(VarCharVector.class)) { + VarCharVector v = (VarCharVector) vv; + v.getMutator().setSafe(counter, record, 0, record.length); + v.getMutator().setValueLengthSafe(counter, record.length); + } else if (vv.getClass().equals(BigIntVector.class)) { + BigIntVector v = (BigIntVector) vv; + v.getMutator().setSafe(counter, rec.getLong(pos)); + } else if (vv.getClass().equals(NullableBigIntVector.class)) { + NullableBigIntVector v = (NullableBigIntVector) vv; + v.getMutator().setSafe(counter, rec.getLong(pos)); + } else if (vv.getClass().equals(IntVector.class)) { + IntVector v = (IntVector) vv; + v.getMutator().setSafe(counter, rec.getInt(pos)); + } else if (vv.getClass().equals(NullableIntVector.class)) { + NullableIntVector v = (NullableIntVector) vv; + v.getMutator().setSafe(counter, rec.getInt(pos)); + } else if (vv.getClass().equals(DateVector.class)) { + DateVector v = (DateVector) vv; + long dtime = DateTime.parse(val).toDate().getTime(); // DateTime.parse(val).toDateTime().getMillis(); + v.getMutator().setSafe(counter, dtime); + } else if (vv.getClass().equals(NullableDateVector.class)) { + NullableDateVector v = (NullableDateVector) vv; + if (rec.wasNull()) { + v.getMutator().setNull(counter); + } else { + long dtime = DateTime.parse(val).toDate().getTime(); + v.getMutator().setSafe(counter, dtime); + } + } else if (vv.getClass().equals(Decimal38DenseVector.class)) { + Decimal38DenseVector v = (Decimal38DenseVector) vv; + java.math.BigDecimal d = rec.getBigDecimal(pos); + } else if (vv.getClass().equals(NullableDecimal38DenseVector.class)) { + NullableDecimal38DenseVector v = (NullableDecimal38DenseVector) vv; + java.math.BigDecimal d = rec.getBigDecimal(pos); + } else { + NullableVarCharVector v = (NullableVarCharVector) vv; + v.getMutator().setSafe(counter, record, 0, record.length); + v.getMutator().setValueLengthSafe(counter, record.length); + } + pos++; + } + pos = 1; + counter++; + } + } catch (SQLException e) { + // TODO Auto-generated catch block + throw new DrillRuntimeException(e); + } + //logger.info("Number of rows returned from JDBC: " + counter); + for (ValueVector vv : vectors) { + vv.getMutator().setValueCount(counter > 0 ? counter : 0); + } + return counter>0 ? counter : 0; + } + + @Override + public void allocate(Map vectorMap) + throws OutOfMemoryException { + int prec = 0; + try { + for (ValueVector vv : vectorMap.values()) { + if (vv.getClass().equals(NullableVarCharVector.class)) { + NullableVarCharVector v = (NullableVarCharVector) vv; + prec = v.getField().getWidth(); + if(prec > 0) { + AllocationHelper.allocate(v, 65536, prec); + } else { + AllocationHelper.allocate(v, 65536, 2000); + } + } else if (vv.getClass().equals(VarCharVector.class)) { + VarCharVector v = (VarCharVector) vv; + prec = v.getField().getWidth(); + if(prec > 0) { + AllocationHelper.allocate(v, 65536, prec); + } else { + AllocationHelper.allocate(v, 65536, 2000); + } + } else if (vv.getClass().equals(BigIntVector.class)) { + BigIntVector v = (BigIntVector) vv; + v.allocateNew(65536); + } else if (vv.getClass().equals(NullableBigIntVector.class)) { + NullableBigIntVector v = (NullableBigIntVector) vv; + v.allocateNew(65536); + } else if (vv.getClass().equals(DateVector.class)) { + DateVector v = (DateVector) vv; + v.allocateNew(65536); + } else if (vv.getClass().equals(NullableDateVector.class)) { + NullableDateVector v = (NullableDateVector) vv; + v.allocateNew(65536); + } else if (vv.getClass().equals(Decimal38DenseVector.class)) { + Decimal38DenseVector v = (Decimal38DenseVector) vv; + v.allocateNew(65536); + } else if (vv.getClass().equals(NullableDecimal38DenseVector.class)) { + NullableDecimal38DenseVector v = (NullableDecimal38DenseVector) vv; + v.allocateNew(65536); + } else if (vv.getClass().equals(IntVector.class)) { + IntVector v = (IntVector) vv; + v.allocateNew(65536); + } else if (vv.getClass().equals(NullableIntVector.class)) { + NullableIntVector v = (NullableIntVector) vv; + v.allocateNew(65536); + } + } + } catch (NullPointerException e) { + throw new OutOfMemoryException(); + } + } + + @Override + public void cleanup() { + // TODO Auto-generated method stub + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java new file mode 100644 index 0000000..fae0e81 --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class MPJdbcScanSpec { + + private String database; + private String table; + private String columns; + + @JsonIgnore + private List filters; + + @JsonCreator + public MPJdbcScanSpec(@JsonProperty("database") String database, + @JsonProperty("table") String table, @JsonProperty("columns") String columns) { + this.database = database; + this.table = table; + this.columns = columns; + } + + public MPJdbcScanSpec(String database, String table, List filters, String columns) { + this.database = database; + this.table = table; + this.filters = filters; + this.columns = columns; + } + + public String getDatabase() { + return this.database; + } + + public String getTable() { + return this.table; + } + + public List getFilters() { + return this.filters; + } + + public String getColumns() { + return this.columns; + } + @Override + public String toString() { + return "MPJdbcScanSpec [Database=" + database + ", table=" + table + + ", columns=" + columns + ", filters=" + filters + "]"; + } + + @Override + public boolean equals(Object obj) { + // TODO Auto-generated method stub + return super.equals(obj); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java new file mode 100644 index 0000000..f3169fc --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.mpjdbc; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Stores the workspace related config. A workspace has: - location which is a + * path. - writable flag to indicate whether the location supports creating new + * tables. - default storage format for new tables created in this workspace. + */ + +public class MPJdbcSchemaConfig { + + /** Default workspace is a root directory which supports read, but not write. */ + public static final MPJdbcSchemaConfig DEFAULT = new MPJdbcSchemaConfig("jdbc://", "", + ""); + + private final String uri; + private final String username; + private final String passwd; + + public MPJdbcSchemaConfig(@JsonProperty("uri") String uri, + @JsonProperty("username") String username, + @JsonProperty("passwd") String passwd) { + this.uri = uri; + this.username = username; + this.passwd = passwd; + } + + public String getUri() { + return uri; + } + + public boolean isWritable() { + return false; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return passwd; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || !(obj instanceof MPJdbcSchemaConfig)) { + return false; + } + + MPJdbcSchemaConfig that = (MPJdbcSchemaConfig) obj; + return ((this.uri == null && that.uri == null) || this.uri.equals(that.uri)) + && this.passwd == that.passwd + && ((this.username == null && that.username == null) || this.username + .equals(that.username)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java new file mode 100644 index 0000000..0e28c8d --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; +public class MPJdbcSchemaFilter { +public MPJdbcSchemaFilter() { +// TODO Auto-generated constructor stub +} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java new file mode 100644 index 0000000..8c92533 --- /dev/null +++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mpjdbc; +import org.apache.drill.exec.physical.base.AbstractSubScan; +import org.apache.drill.exec.store.mpjdbc.MPJdbcSchemaFilter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +public class MPJdbcSchemaSubScan extends AbstractSubScan { +private final String table; +private final MPJdbcSchemaFilter filter; +private final String userName; + + @JsonCreator + public MPJdbcSchemaSubScan(@JsonProperty("userName") String userName, + @JsonProperty("table") String table, + @JsonProperty("filter") MPJdbcSchemaFilter filter) { + super(userName); + this.table = table; + this.filter = filter; + this.userName = userName; + } + @JsonProperty("table") + public String getTable() { + return table; + } + @JsonProperty("filter") + public MPJdbcSchemaFilter getFilter() { + return filter; + } + @JsonProperty("userName") + public String getUserName() { + return this.userName; + } +@Override +public int getOperatorType() { +// TODO Auto-generated method stub +return 0; +} +} \ No newline at end of file