drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [02/15] drill git commit: DRILL-3180: Initial JDBC plugin implementation.
Date Mon, 14 Sep 2015 05:28:49 GMT
DRILL-3180: Initial JDBC plugin implementation.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8478e9fb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8478e9fb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8478e9fb

Branch: refs/heads/master
Commit: 8478e9fb1d7e6881d8f092ae9ff3a338f2e023a6
Parents: b525692
Author: MPierre <mp_200711@mac.com>
Authored: Sat Aug 1 18:07:18 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Sun Sep 13 17:02:18 2015 -0700

----------------------------------------------------------------------
 contrib/storage-mpjdbc/pom.xml                  |  79 ++++
 .../exec/store/mpjdbc/MPJdbcBatchCreator.java   |  54 +++
 .../drill/exec/store/mpjdbc/MPJdbcClient.java   | 300 ++++++++++++
 .../exec/store/mpjdbc/MPJdbcClientOptions.java  |  52 ++
 .../exec/store/mpjdbc/MPJdbcCnxnManager.java    |  69 +++
 .../exec/store/mpjdbc/MPJdbcFilterBuilder.java  | 235 +++++++++
 .../exec/store/mpjdbc/MPJdbcFilterRule.java     |  60 +++
 .../exec/store/mpjdbc/MPJdbcFormatConfig.java   | 109 +++++
 .../exec/store/mpjdbc/MPJdbcFormatPlugin.java   | 170 +++++++
 .../exec/store/mpjdbc/MPJdbcGroupScan.java      | 181 +++++++
 .../exec/store/mpjdbc/MPJdbcRecordReader.java   | 471 +++++++++++++++++++
 .../drill/exec/store/mpjdbc/MPJdbcScanSpec.java |  76 +++
 .../exec/store/mpjdbc/MPJdbcSchemaConfig.java   |  80 ++++
 .../exec/store/mpjdbc/MPJdbcSchemaFilter.java   |  23 +
 .../exec/store/mpjdbc/MPJdbcSchemaSubScan.java  |  55 +++
 .../drill/exec/store/mpjdbc/MPJdbcSubScan.java  | 119 +++++
 .../resources/bootstrap-storage-plugins.json    |  12 +
 .../src/main/resources/checkstyle-config.xml    |  41 ++
 .../main/resources/checkstyle-suppressions.xml  |  19 +
 .../src/main/resources/drill-module.conf        |  30 ++
 20 files changed, 2235 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/pom.xml b/contrib/storage-mpjdbc/pom.xml
new file mode 100755
index 0000000..5e9afca
--- /dev/null
+++ b/contrib/storage-mpjdbc/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-mpjdbc-storage</artifactId>
+
+  <name>contrib/mpjdbc-storage-plugin</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+
+    <!-- Test dependencie -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.yammer.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>2.1.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <property>
+              <name>logback.log.dir</name>
+              <value>${project.build.directory}/surefire-reports</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  
+</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java
new file mode 100644
index 0000000..af08b2e
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcBatchCreator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+//import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mpjdbc.MPJdbcSchemaSubScan;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class MPJdbcBatchCreator implements BatchCreator<MPJdbcSubScan> {
+  @Override
+  public RecordBatch getBatch(FragmentContext context, MPJdbcSubScan config,
+      List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    List<RecordReader> readers = Lists.newArrayList();
+    List<SchemaPath> columns = null;
+    try {
+      if ((columns = config.getColumns()) == null) {
+        columns = GroupScan.ALL_COLUMNS;
+      }
+      readers.add(new MPJdbcRecordReader(context,config));
+    } catch (Exception e1) {
+      throw new ExecutionSetupException(e1);
+    }
+    return new ScanBatch(config, context, readers.iterator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java
new file mode 100644
index 0000000..f3bf81d
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClient.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.linq4j.Extensions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.store.AbstractSchema;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+
+class MPJdbcClient {
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+       .getLogger(MPJdbcClient.class);
+
+    private MPJdbcClientOptions clientOptions;
+    private Connection conn;
+    private DatabaseMetaData metadata;
+    private String uri;
+    private OdbcSchema defaultSchema;
+    private MPJdbcFormatPlugin plugin;
+    private String plugName;
+
+    public MPJdbcClient(String uri, MPJdbcClientOptions clientOptions,
+            MPJdbcFormatPlugin plugin) {
+        try {
+            Class.forName(clientOptions.getDriver()).newInstance();
+            this.clientOptions = clientOptions;
+
+            String user = this.clientOptions.getUser();
+            String passwd = this.clientOptions.getPassword();
+            this.plugin = plugin;
+            this.uri = uri;
+
+            if (user == null || user.length() == 0 || passwd.length() == 0) {
+                logger.info("username, password assumed to be in the uri");
+                this.conn = DriverManager.getConnection(uri);
+            } else {
+                this.conn = DriverManager.getConnection(uri, user, passwd);
+            }
+            this.metadata = this.conn.getMetaData();
+            this.plugName = plugin.getName();
+        } catch (InstantiationException e) {
+            // TODO Auto-generated catch block
+           new DrillRuntimeException(e);
+        } catch (IllegalAccessException e) {
+            // TODO Auto-generated catch block
+            new DrillRuntimeException(e);
+        } catch (ClassNotFoundException e) {
+            // TODO Auto-generated catch block
+            new DrillRuntimeException(e);
+        } catch (SQLException e) {
+            new DrillRuntimeException(e);
+        }
+    }
+
+    public Connection getConnection() {
+        return this.conn;
+    }
+
+    public Map<String, Integer> getSchemas() {
+        Map<String, Integer> lst = new HashMap<String, Integer>();
+        try {
+            ResultSet rs = this.metadata.getCatalogs();
+            while (rs.next()) {
+                Integer val = lst.get(rs.getString(1));
+                if (val == null) {
+                    lst.put(rs.getString(1), new Integer(1));
+                }
+            }
+
+        } catch (SQLException e) {
+            new DrillRuntimeException(e);
+        }
+        return lst;
+    }
+
+    public Set<String> getTables(String catalog) {
+        Set<String> lst = new HashSet<String>();
+
+        String[] typeList = { "TABLE", "VIEW" };
+        try {
+            ResultSet rs = this.metadata
+                    .getTables(catalog,null, null, null);
+            while (rs.next()) {
+                if (rs.getString(1) != null) {
+                  //lst.add(rs.getString(1) + "." + rs.getString("TABLE_NAME"));
+                  lst.add(rs.getString("TABLE_NAME"));
+                } else {
+                    lst.add(rs.getString("TABLE_NAME"));
+                }
+            }
+
+        } catch (SQLException e) {
+            throw new DrillRuntimeException(e);
+        }
+        return lst;
+    }
+
+    public List<String> getDatabases() {
+        List<String> lst = new ArrayList<String>();
+        try {
+            ResultSet rs = this.metadata.getCatalogs();
+            while (rs.next()) {
+                lst.add(rs.getString(0));
+            }
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+        return lst;
+    }
+
+    public void close() {
+        // TODO Auto-generated method stub
+        try {
+            this.conn.close();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    public OdbcSchema getSchema() {
+        List<String> l = new ArrayList<String>();
+        String currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions,
+                plugin).getCurrentSchema();
+        defaultSchema = new OdbcSchema(currentSchema);
+        return defaultSchema;
+    }
+
+    public OdbcSchema getSchema(String name) {
+        List<String> l = new ArrayList<String>();
+        OdbcSchema schema = new OdbcSchema(name);
+        return schema;
+    }
+
+    public class OdbcSchema extends AbstractSchema {
+
+        private Map<String, Integer> sub_schemas;
+        private String currentSchema;
+        private Set<String> tables;
+
+        public OdbcSchema(String name) {
+            super(ImmutableList.<String> of(), name);
+            /*currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions,
+                    plugin).getCurrentSchema();
+            if (currentSchema == null) {
+                currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions,
+                        plugin).getCurrentSchema();
+            }
+            */
+            if(name.equals("")) {
+              sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin)
+                  .getSchemas();
+            }
+            tables = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin)
+                    .getTables(name);
+        }
+
+        public OdbcSchema(List<String> parentSchemaPath, String name) {
+            super(parentSchemaPath, name);
+            currentSchema = MPJdbcCnxnManager.getClient(uri, clientOptions,
+                    plugin).getCurrentSchema();
+            if (currentSchema == null) {
+                currentSchema = "ROOT";
+            }
+            sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin)
+                    .getSchemas();
+            // TODO Auto-generated constructor stub
+        }
+
+        @Override
+        public String getTypeName() {
+            // TODO Auto-generated method stub
+            return "odbc";
+        }
+
+        @Override
+        public AbstractSchema getSubSchema(String name) {
+            if (sub_schemas == null) {
+                sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions,
+                        plugin).getSchemas();
+            }
+            Integer a = sub_schemas.get(name);
+            if (a == 1) {
+                return new OdbcSchema(name);
+            }
+            return null;
+        }
+
+        @Override
+        public Table getTable(String name) {
+            // TODO Auto-generated method stub
+          String tableName = null;
+          if(name.contains(".")) {
+            String[] val = name.split("\\.");
+            OdbcSchema sub = (OdbcSchema) this.getSubSchema(val[0]);
+            return sub.getTable(val[1]);
+          }
+          Iterator<String> iter = tables.iterator();
+          while(iter.hasNext()) {
+            tableName = iter.next();
+            if(tableName.equalsIgnoreCase(name)) {
+              break;
+            }
+            else {
+              tableName = null;
+            }
+          }
+          if(tableName == null) {
+            return null;
+          }
+          MPJdbcScanSpec spec = new MPJdbcScanSpec(this.name, tableName,"");
+          return new DynamicDrillTable(plugin, "odbc", spec);
+        }
+
+        @Override
+        public Set<String> getTableNames() {
+            // TODO Auto-generated method stub
+            Set<String> Tables = MPJdbcCnxnManager.getClient(uri, clientOptions,
+                    plugin).getTables(name);
+            return Tables;
+        }
+
+        @Override
+        public Set<String> getSubSchemaNames() {
+            // TODO Auto-generated method stub
+            sub_schemas = MPJdbcCnxnManager.getClient(uri, clientOptions, plugin)
+                    .getSchemas();
+            return sub_schemas.keySet();
+        }
+
+        @Override
+        public Collection<Function> getFunctions(String name) {
+            // TODO Auto-generated method stub
+            return super.getFunctions(name);
+        }
+
+        @Override
+        public AbstractSchema getDefaultSchema() {
+            return MPJdbcCnxnManager.getClient(uri, clientOptions, plugin)
+                    .getDefaultSchema();
+        }
+
+    }
+
+    public String getCurrentSchema() {
+        // TODO Auto-generated method stub
+        try {
+            return this.conn.getCatalog();
+        } catch (SQLException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    public AbstractSchema getDefaultSchema() {
+        // TODO Auto-generated method stub
+        return defaultSchema;
+    }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java
new file mode 100644
index 0000000..84b6348
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcClientOptions.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+public class MPJdbcClientOptions {
+  private String driver;
+  private String user;
+  private String passwd;
+
+  public MPJdbcClientOptions(String driver, String user, String passwd) {
+    this.driver = driver;
+    this.user = user;
+    this.passwd = passwd;
+  }
+
+  public MPJdbcClientOptions(MPJdbcFormatConfig storageConfig) {
+    this.driver = storageConfig.getDriver();
+    this.user = storageConfig.getUser();
+    this.passwd = storageConfig.getPasswd();
+  }
+
+  public String getDriver() {
+    // TODO Auto-generated method stub
+    return this.driver;
+  }
+
+  public String getUser() {
+    // TODO Auto-generated method stub
+    return this.user;
+  }
+
+  public String getPassword() {
+    // TODO Auto-generated method stub
+    return this.passwd;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java
new file mode 100644
index 0000000..7e43c32
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcCnxnManager.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+import org.apache.drill.exec.store.mpjdbc.MPJdbcClientOptions;
+
+public class MPJdbcCnxnManager {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(MPJdbcCnxnManager.class);
+  private static Cache<String, MPJdbcClient> uriClientMap;
+
+  static {
+    uriClientMap = CacheBuilder.newBuilder().maximumSize(5)
+        .expireAfterAccess(10, TimeUnit.MINUTES)
+        .removalListener(new uriCloser()).build();
+  }
+
+  private static class uriCloser implements RemovalListener<String, MPJdbcClient> {
+
+    @Override
+    public synchronized void onRemoval(
+        RemovalNotification<String, MPJdbcClient> removal) {
+      removal.getValue().close();
+      logger.debug("Closed connection to {}.", removal.getKey().toString());
+    }
+
+  }
+
+  public synchronized static MPJdbcClient getClient(String uri,
+      MPJdbcClientOptions clientOptions, MPJdbcFormatPlugin plugin) {
+    MPJdbcClient client = uriClientMap.getIfPresent(uri);
+    if (client == null) {
+      client = new MPJdbcClient(uri, clientOptions,plugin);
+      if (client.getConnection() != null) {
+        uriClientMap.put(uri, client);
+      } else {
+        return null;
+      }
+    }
+
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java
new file mode 100644
index 0000000..488a55d
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterBuilder.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.mpjdbc.MPJdbcScanSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+public class MPJdbcFilterBuilder extends
+    AbstractExprVisitor<MPJdbcScanSpec, Void, RuntimeException> {
+  static final Logger logger = LoggerFactory
+      .getLogger(MPJdbcFilterBuilder.class);
+  final MPJdbcGroupScan groupScan;
+  final LogicalExpression le;
+  private boolean allExpressionsConverted = true;
+
+  public MPJdbcFilterBuilder(MPJdbcGroupScan groupScan,
+      LogicalExpression conditionExp) {
+    this.groupScan = groupScan;
+    this.le = conditionExp;
+  }
+
+  public MPJdbcScanSpec parseTree() {
+    MPJdbcScanSpec parsedSpec = le.accept(this, null);
+    if (parsedSpec != null) {
+      parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getScanSpec(),
+          parsedSpec);
+    }
+    return parsedSpec;
+  }
+
+  private MPJdbcScanSpec mergeScanSpecs(String functionName,
+     MPJdbcScanSpec leftScanSpec, MPJdbcScanSpec rightScanSpec) {
+    List<String> newFilter;
+    switch (functionName) {
+    case "booleanAnd":
+      if (leftScanSpec.getFilters() != null
+          && rightScanSpec.getFilters() != null) {
+        /* newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(),
+            rightScanSpec.getFilters()); */
+      } else if (leftScanSpec.getFilters() != null) {
+        newFilter = leftScanSpec.getFilters();
+      } else {
+        newFilter = rightScanSpec.getFilters();
+      }
+      break;
+    case "booleanOr":
+     /* newFilter = OdbcUtils.orFilterAtIndex(leftScanSpec.getFilters(),
+          rightScanSpec.getFilters()); */
+    }
+    MPJdbcScanSpec mp =  new MPJdbcScanSpec(groupScan.getScanSpec().getDatabase(), groupScan
+        .getScanSpec().getTable(), groupScan.getScanSpec().getColumns());
+    return mp;
+  }
+
+  public boolean isAllExpressionsConverted() {
+    return allExpressionsConverted;
+  }
+
+  @Override
+  public MPJdbcScanSpec visitUnknown(LogicalExpression e, Void value)
+      throws RuntimeException {
+    allExpressionsConverted = false;
+    return null;
+  }
+
+  @Override
+  public MPJdbcScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+    List<LogicalExpression> args = op.args;
+    MPJdbcScanSpec nodeScanSpec = null;
+    String functionName = op.getName();
+    for (int i = 0; i < args.size(); ++i) {
+      switch (functionName) {
+      case "booleanAnd":
+      case "booleanOr":
+        if (nodeScanSpec == null) {
+          nodeScanSpec = args.get(i).accept(this, null);
+        } else {
+          MPJdbcScanSpec scanSpec = args.get(i).accept(this, null);
+          if (scanSpec != null) {
+            nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+          } else {
+            allExpressionsConverted = false;
+          }
+        }
+        break;
+      }
+    }
+    return nodeScanSpec;
+  }
+
+  @Override
+  public MPJdbcScanSpec visitFunctionCall(FunctionCall call, Void value)
+      throws RuntimeException {
+    MPJdbcScanSpec nodeScanSpec = null;
+    String functionName = call.getName();
+    ImmutableList<LogicalExpression> args = call.args;
+    LogicalExpression nameVal = call.args.get(0);
+    LogicalExpression valueVal = null;
+    StringBuilder strBuilder = new StringBuilder();
+    if(call.args.size() >= 2) {
+      valueVal = call.args.get(1);
+    }
+    logger.info("Name Val:" + nameVal.toString());
+    logger.info("Value Val:" + valueVal.toString());
+
+    switch(functionName) {
+    case "equal":
+      break;
+     default:
+       break;
+    }
+    /*
+    if (OdbcCompareFunctionProcessor.isCompareFunction(functionName)) {
+      OdbcCompareFunctionProcessor processor = OdbcCompareFunctionProcessor
+          .process(call);
+      if (processor.isSuccess()) {
+        try {
+          nodeScanSpec = createOdbcScanSpec(processor.getFunctionName(),
+              processor.getPath(), processor.getValue());
+        } catch (Exception e) {
+          logger.error(" Failed to creare Filter ", e);
+          // throw new RuntimeException(e.getMessage(), e);
+        }
+      }
+    } else {
+      switch (functionName) {
+      case "booleanAnd":
+      case "booleanOr":
+        MPJdbcScanSpec leftScanSpec = args.get(0).accept(this, null);
+        MPJdbcScanSpec rightScanSpec = args.get(1).accept(this, null);
+        if (leftScanSpec != null && rightScanSpec != null) {
+          nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec,
+              rightScanSpec);
+        } else {
+          allExpressionsConverted = false;
+          if ("booleanAnd".equals(functionName)) {
+            nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec;
+          }
+        }
+        break;
+      }
+    }
+ */
+    if (nodeScanSpec == null) {
+      allExpressionsConverted = false;
+    }
+
+    return nodeScanSpec;
+  }
+
+  private MPJdbcScanSpec createOdbcScanSpec(String functionName,
+      SchemaPath field, Object fieldValue) throws ClassNotFoundException,
+      Exception {
+    // extract the field name
+    String fieldName = field.getAsUnescapedPath();
+    /*
+    OdbcCompareOp compareOp = null;
+    switch (functionName) {
+    case "equal":
+      compareOp = OdbcCompareOp.EQUAL;
+      break;
+    case "not_equal":
+      compareOp = OdbcCompareOp.NOT_EQUAL;
+      break;
+    case "greater_than_or_equal_to":
+      compareOp = OdbcCompareOp.GREATER_OR_EQUAL;
+      break;
+    case "greater_than":
+      compareOp = OdbcCompareOp.GREATER;
+      break;
+    case "less_than_or_equal_to":
+      compareOp = OdbcCompareOp.LESS_OR_EQUAL;
+      break;
+    case "less_than":
+      compareOp = OdbcCompareOp.LESS;
+      break;
+    case "isnull":
+    case "isNull":
+    case "is null":
+      compareOp = OdbcCompareOp.IFNULL;
+      break;
+    case "isnotnull":
+    case "isNotNull":
+    case "is not null":
+      compareOp = OdbcCompareOp.IFNOTNULL;
+      break;
+    }
+
+    if (compareOp != null) {
+      BasicDBObject queryFilter = new BasicDBObject();
+      if (compareOp == OdbcCompareOp.IFNULL) {
+        queryFilter.put(fieldName,
+            new BasicDBObject(OdbcCompareOp.EQUAL.getCompareOp(), null));
+      } else if (compareOp == OdbcCompareOp.IFNOTNULL) {
+        queryFilter.put(fieldName,
+            new BasicDBObject(OdbcCompareOp.NOT_EQUAL.getCompareOp(), null));
+      } else {
+        queryFilter.put(fieldName, new BasicDBObject(compareOp.getCompareOp(),
+            fieldValue));
+      }
+      return new MPJdbcScanSpec(groupScan.getScanSpec().getDbName(), groupScan
+          .getScanSpec().getCollectionName(), queryFilter);
+    }
+    */
+    return null;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java
new file mode 100644
index 0000000..c0564d0
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFilterRule.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+
+public class MPJdbcFilterRule extends StoragePluginOptimizerRule {
+  public static final StoragePluginOptimizerRule INSTANCE = new MPJdbcFilterRule();
+  public MPJdbcFilterRule(RelOptRuleOperand operand, String description) {
+    super(operand, description);
+    // TODO Auto-generated constructor stub
+  }
+
+  public MPJdbcFilterRule() {
+    // TODO Auto-generated constructor stub
+    super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MPJdbcFilterRule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    // TODO Auto-generated method stub
+    final ScanPrel scan = (ScanPrel) call.rel(1);
+    final FilterPrel filter = (FilterPrel) call.rel(0);
+    final RexNode condition = filter.getCondition();
+    final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(), scan, condition);
+    MPJdbcGroupScan grpScan= (MPJdbcGroupScan) scan.getGroupScan();
+    MPJdbcFilterBuilder builder = new MPJdbcFilterBuilder(grpScan,conditionExp);
+    MPJdbcScanSpec result = builder.parseTree();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java
new file mode 100644
index 0000000..8edce3e
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatConfig.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@JsonTypeName(MPJdbcFormatConfig.NAME)
+public class MPJdbcFormatConfig extends StoragePluginConfig {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(MPJdbcFormatConfig.class);
+  public static final String NAME = "jdbc";
+
+  @JsonIgnore
+  private String driver;
+  @JsonIgnore
+  private String uri;
+  @JsonIgnore
+  private String username;
+  @JsonIgnore
+  private String password;
+
+  @JsonCreator
+  public MPJdbcFormatConfig(@JsonProperty("driver") String driver,
+      @JsonProperty("uri") String uri,
+      @JsonProperty("username") String username,
+      @JsonProperty("password") String password) {
+    this.driver = driver == null ? "" : driver;
+    this.uri = uri == null ? "jdbc://" : uri;
+    this.username = username == null ? "" : username;
+    this.password = password == null ? "" : password;
+
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    MPJdbcFormatConfig that = (MPJdbcFormatConfig) o;
+
+    if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @JsonProperty("driver")
+  public String getDriver() {
+    return this.driver;
+  }
+
+  @JsonProperty("uri")
+  public String getUri() {
+    return this.uri;
+  }
+
+  @JsonProperty("username")
+  public String getUser() {
+    return this.username;
+  }
+
+  @JsonProperty("password")
+  public String getPasswd() {
+    return this.password;
+  }
+
+  @Override
+  public int hashCode() {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      String outval = mapper.writeValueAsString(this);
+      logger.info("FormatConfigHashCode:" + outval);
+
+      return outval.hashCode();
+    } catch (JsonProcessingException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      return uri.hashCode();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java
new file mode 100644
index 0000000..5a0dd4b
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcFormatPlugin.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Schema.TableType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.linq4j.Extensions;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
+import org.apache.drill.exec.store.mpjdbc.MPJdbcClient.OdbcSchema;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class MPJdbcFormatPlugin extends AbstractStoragePlugin {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(MPJdbcFormatPlugin.class);
+
+  private final MPJdbcFormatConfig storageConfig;
+  protected String name = "odbc";
+  private final DrillbitContext context;
+
+  public MPJdbcFormatPlugin(MPJdbcFormatConfig storageConfig,
+      DrillbitContext context, String name) {
+    this.context = context;
+    this.storageConfig = storageConfig;
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      String result = mapper.writeValueAsString(storageConfig);
+      logger.info(result);
+    } catch (JsonProcessingException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    this.name = name;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
+    if(storageConfig == null) {
+       logger.info("StorageConfig is null");
+    }
+    MPJdbcClientOptions options = new MPJdbcClientOptions(storageConfig);
+    MPJdbcClient client = MPJdbcCnxnManager.getClient(storageConfig.getUri(),
+        options,this);
+    Connection conn = (client == null) ? null : client.getConnection();
+    Map<String, Integer> schemas;
+    if(client == null) {
+      logger.info("Could not create client...");
+    }
+    OdbcSchema o = client.getSchema();
+    SchemaPlus tl = parent.add(this.name, o);
+    try {
+      schemas = client.getSchemas();
+      Set<Entry<String, Integer>> a = schemas.entrySet();
+      Iterator<Entry<String, Integer>> aiter = a.iterator();
+      while (aiter.hasNext()) {
+        Entry<String, Integer> val = aiter.next();
+        String catalog = val.getKey();
+        OdbcSchema sc = client.getSchema(catalog);
+        tl.add(catalog, sc);
+      }
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public MPJdbcFormatConfig getConfig() {
+    logger.info("MPJdbcFormatPlugin:getConfig called");
+    logger.info(storageConfig.toString());
+    return storageConfig;
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  public String getName() {
+      return this.name;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+/*
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName,JSONOptions selection)
+      throws IOException {
+    MPJdbcScanSpec odbcScanSpec = selection.getListWith(new ObjectMapper(),
+        new TypeReference<MPJdbcScanSpec>() {
+        });
+    return new MPJdbcGroupScan(userName,this, odbcScanSpec, null);
+  }
+  */
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName,JSONOptions selection,List<SchemaPath> columns)
+      throws IOException {
+    MPJdbcScanSpec mPJdbcScanSpec = selection.getListWith(new ObjectMapper(),
+        new TypeReference<MPJdbcScanSpec>() {
+        });
+    return new MPJdbcGroupScan(userName,this, mPJdbcScanSpec, columns);
+  }
+
+  @Override
+  public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+    // TODO Auto-generated method stub
+    return ImmutableSet.of(MPJdbcFilterRule.INSTANCE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java
new file mode 100644
index 0000000..a16f8c8
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcGroupScan.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
+import org.apache.drill.common.expression.SchemaPath;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class MPJdbcGroupScan extends AbstractGroupScan {
+
+  private MPJdbcFormatPlugin plugin;
+  private MPJdbcFormatConfig pluginConfig;
+  private MPJdbcScanSpec mPJdbcScanSpec;
+  private List<SchemaPath> columns;
+  private String userName;
+  private Map<Integer, List<MPJdbcScanSpec>> endpointFragmentMapping;
+
+  public MPJdbcGroupScan(String userName,MPJdbcFormatPlugin storagePlugin, MPJdbcScanSpec scanSpec,
+      List<SchemaPath> columns) {
+    super(userName);
+    this.plugin = storagePlugin;
+    this.pluginConfig = storagePlugin.getConfig();
+    this.mPJdbcScanSpec = scanSpec;
+    this.userName = userName;
+    this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS
+        : columns;
+  }
+
+  public MPJdbcGroupScan(MPJdbcGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.plugin = that.plugin;
+    this.endpointFragmentMapping = that.endpointFragmentMapping;
+    this.pluginConfig = that.pluginConfig;
+    this.mPJdbcScanSpec = that.mPJdbcScanSpec;
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId)
+      throws ExecutionSetupException {
+    // TODO Auto-generated method stub
+    return new MPJdbcSubScan(plugin,userName, pluginConfig,
+        endpointFragmentMapping.get(minorFragmentId), columns);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    // TODO Auto-generated method stub
+    return -1;
+  }
+
+  @Override
+  public String getDigest() {
+    // TODO Auto-generated method stub
+    return toString();
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    MPJdbcGroupScan newScan = new MPJdbcGroupScan(userName,plugin, mPJdbcScanSpec, columns);
+    return newScan;
+
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    // TODO Auto-generated method stub
+    return ScanStats.TRIVIAL_TABLE;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
+      throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new MPJdbcGroupScan(this);
+    // TODO Auto-generated method stub
+  }
+  @Override
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    this.columns = columns;
+    return true;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
+    for (DrillbitEndpoint ep : plugin.getContext().getBits()) {
+      endpointMap.put(ep.getAddress(), ep);
+    }
+
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
+    DrillbitEndpoint ep = endpointMap.get(plugin.getConfig().getUri());
+    if (ep != null) {
+      EndpointAffinity affinity = affinityMap.get(ep);
+      if (affinity == null) {
+        affinityMap.put(ep, new EndpointAffinity(ep, 1));
+      } else {
+        affinity.addAffinity(1);
+      }
+    }
+    return Lists.newArrayList(affinityMap.values());
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+    final int numSlots = incomingEndpoints.size();
+    int totalAssignmentsTobeDone = 1;
+    Preconditions.checkArgument(numSlots <= totalAssignmentsTobeDone, String
+        .format("Incoming endpoints %d is greater than number of chunks %d",
+            numSlots, totalAssignmentsTobeDone));
+    final int minPerEndpointSlot = (int) Math
+        .floor((double) totalAssignmentsTobeDone / numSlots);
+    final int maxPerEndpointSlot = (int) Math
+        .ceil((double) totalAssignmentsTobeDone / numSlots);
+    /* Map for (index,endpoint)'s */
+    endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+    /* Reverse mapping for above indexes */
+    Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+    /*
+     * Initialize these two maps
+     */
+    for (int i = 0; i < numSlots; ++i) {
+      List<MPJdbcScanSpec> val = new ArrayList<MPJdbcScanSpec>(maxPerEndpointSlot);
+      val.add(this.mPJdbcScanSpec);
+      endpointFragmentMapping.put(i, val);
+      String hostname = incomingEndpoints.get(i).getAddress();
+      Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+      if (hostIndexQueue == null) {
+        hostIndexQueue = Lists.newLinkedList();
+        endpointHostIndexListMap.put(hostname, hostIndexQueue);
+      }
+      hostIndexQueue.add(i);
+    }
+  }
+
+  public MPJdbcScanSpec getScanSpec() {
+    return this.mPJdbcScanSpec;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java
new file mode 100644
index 0000000..498b7fd
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcRecordReader.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Decimal38DenseHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.DateVector;
+import org.apache.drill.exec.vector.Decimal38DenseVector;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal38DenseVector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableVar16CharVector.Mutator;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.TimeStampVector;
+import org.apache.drill.exec.vector.TimeVector;
+import org.apache.drill.exec.vector.ValueHolderHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class MPJdbcRecordReader extends AbstractRecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(MPJdbcRecordReader.class);
+
+  private ResultSet rec;
+  private VectorContainerWriter writer;
+  private FragmentContext fc;
+  private MPJdbcSubScan scanSpec;
+  private MPJdbcFormatPlugin plugin;
+  private List<MPJdbcScanSpec> scanList;
+  private MPJdbcFormatConfig config;
+  private Connection conn;
+  private Statement statement;
+  private String table;
+  private String database;
+  protected List<ValueVector> vectors = Lists.newArrayList();
+  private int col_cnt = 0;
+  private MajorType.Builder t;
+  private OutputMutator outputMutator;
+  private ResultSetMetaData meta;
+  private OperatorContext operatorContext;
+  private String columns;
+  private List<String> filters;
+
+  public MPJdbcRecordReader(FragmentContext fragmentContext, MPJdbcSubScan scan) {
+    fc = fragmentContext;
+    scanSpec = scan;
+    // TODO Auto-generated constructor stub
+    this.plugin = scanSpec.getPlugin();
+    this.scanList = scanSpec.getScanList();
+    this.config = scanSpec.getConfig();
+    MPJdbcClientOptions options = new MPJdbcClientOptions(config);
+    MPJdbcClient client = MPJdbcCnxnManager.getClient(config.getUri(), options,
+        this.plugin);
+    conn = client.getConnection();
+    Iterator<MPJdbcScanSpec> iter = scanList.iterator();
+    while (iter.hasNext()) {
+      MPJdbcScanSpec o = iter.next();
+      table = o.getTable();
+      database = o.getDatabase();
+      List<SchemaPath> ColList = scan.getColumns();
+      Iterator<SchemaPath> collist_iter = ColList.iterator();
+      StringBuilder b = new StringBuilder();
+      while(collist_iter.hasNext()) {
+          SchemaPath val = collist_iter.next();
+          b.append(val.getAsUnescapedPath().trim());
+          if(collist_iter.hasNext()) {
+              b.append(",");
+          }
+      }
+      columns = b.toString();
+      filters = o.getFilters();
+    }
+    try {
+      statement = conn.createStatement();
+      rec = statement.executeQuery("SELECT " + this.columns + " FROM " + database.trim() + "." + table.trim());
+    } catch (SQLException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    try {
+      meta = rec.getMetaData();
+      col_cnt = meta.getColumnCount();
+      Class cls = null;
+      for (int i = 1; i <= col_cnt; i++) {
+        String column_label = meta.getColumnLabel(i);
+        int types = meta.getColumnType(i);
+        int isnullable = meta.isNullable(i);
+        int width = meta.getPrecision(i);
+        int scale = meta.getScale(i);
+        MaterializedField field = null;
+        switch (types) {
+        case java.sql.Types.BIGINT:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.BIGINT);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableBigIntVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, BigIntVector.class));
+          }
+          break;
+        case Types.DATE:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.DATE);
+          cls = org.apache.drill.exec.vector.DateVector.class;
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableDateVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, DateVector.class));
+          }
+          break;
+        case Types.DECIMAL:
+          t = MajorType.newBuilder().setMinorType(
+              TypeProtos.MinorType.DECIMAL38DENSE);
+          t.setMode(DataMode.OPTIONAL);
+          field = MaterializedField.create(column_label, t.build());
+          vectors.add(output.addField(field, Decimal38DenseVector.class));
+          break;
+        case Types.DOUBLE:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.BIGINT);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableBigIntVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, BigIntVector.class));
+          }
+          break;
+        case Types.FLOAT:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FLOAT8);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableFloat8Vector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, Float8Vector.class));
+          }
+          break;
+        case Types.INTEGER:
+        case Types.SMALLINT:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.INT);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableIntVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, IntVector.class));
+          }
+          break;
+        case Types.LONGNVARCHAR:
+        case Types.LONGVARCHAR:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR);
+          t.setWidth(width);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableVarCharVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, VarCharVector.class));
+          }
+          break;
+        case Types.LONGVARBINARY:
+          break;
+        case Types.CHAR:
+        case Types.NCHAR:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR);
+          t.setWidth(width);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableVarCharVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, VarCharVector.class));
+          }
+          break;
+        case Types.NUMERIC:
+          t = MajorType.newBuilder().setMinorType(
+              TypeProtos.MinorType.DECIMAL38DENSE);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field,
+                NullableDecimal38DenseVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, Decimal38DenseVector.class));
+          }
+          break;
+        case Types.NVARCHAR:
+        case Types.VARCHAR:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR);
+          t.setWidth(width);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableVarCharVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, VarCharVector.class));
+          }
+          break;
+        case Types.TIME:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.TIME);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableTimeVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, TimeVector.class));
+          }
+          break;
+        case Types.TIMESTAMP:
+          t = MajorType.newBuilder().setMinorType(
+              TypeProtos.MinorType.TIMESTAMP);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableTimeStampVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, TimeStampVector.class));
+          }
+          break;
+        default:
+          t = MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR);
+          t.setWidth(width);
+          if (isnullable == 1) {
+            t.setMode(DataMode.OPTIONAL);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, NullableVarCharVector.class));
+          } else {
+            t.setMode(DataMode.REQUIRED);
+            field = MaterializedField.create(column_label, t.build());
+            vectors.add(output.addField(field, VarCharVector.class));
+          }
+          break;
+        }
+      }
+      this.outputMutator = output;
+
+    } catch (SQLException | SchemaChangeException e) {
+      // TODO Auto-generated catch block
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+  @Override
+  public int next() {
+    // TODO Auto-generated method stub
+    int counter = 0;
+    int pos = 1;
+    int prec = 0;
+    Boolean b = true;
+    try {
+      while (counter < 65536 && b == true) {
+        b = rec.next();
+        if(b == false) {
+            break;
+        }
+        for (ValueVector vv : vectors) {
+          String val = rec.getString(pos);
+          byte[] record = val.getBytes(Charsets.UTF_8);
+          if (vv.getClass().equals(NullableVarCharVector.class)) {
+            NullableVarCharVector v = (NullableVarCharVector) vv;
+            v.getMutator().setSafe(counter, record, 0, record.length);
+            v.getMutator().setValueLengthSafe(counter, record.length);
+          } else if (vv.getClass().equals(VarCharVector.class)) {
+            VarCharVector v = (VarCharVector) vv;
+            v.getMutator().setSafe(counter, record, 0, record.length);
+            v.getMutator().setValueLengthSafe(counter, record.length);
+          } else if (vv.getClass().equals(BigIntVector.class)) {
+            BigIntVector v = (BigIntVector) vv;
+            v.getMutator().setSafe(counter, rec.getLong(pos));
+          } else if (vv.getClass().equals(NullableBigIntVector.class)) {
+            NullableBigIntVector v = (NullableBigIntVector) vv;
+            v.getMutator().setSafe(counter, rec.getLong(pos));
+          } else if (vv.getClass().equals(IntVector.class)) {
+            IntVector v = (IntVector) vv;
+            v.getMutator().setSafe(counter, rec.getInt(pos));
+          } else if (vv.getClass().equals(NullableIntVector.class)) {
+            NullableIntVector v = (NullableIntVector) vv;
+            v.getMutator().setSafe(counter, rec.getInt(pos));
+          } else if (vv.getClass().equals(DateVector.class)) {
+            DateVector v = (DateVector) vv;
+            long dtime = DateTime.parse(val).toDate().getTime(); // DateTime.parse(val).toDateTime().getMillis();
+            v.getMutator().setSafe(counter, dtime);
+          } else if (vv.getClass().equals(NullableDateVector.class)) {
+            NullableDateVector v = (NullableDateVector) vv;
+            if (rec.wasNull()) {
+              v.getMutator().setNull(counter);
+            } else {
+              long dtime = DateTime.parse(val).toDate().getTime();
+              v.getMutator().setSafe(counter, dtime);
+            }
+          } else if (vv.getClass().equals(Decimal38DenseVector.class)) {
+            Decimal38DenseVector v = (Decimal38DenseVector) vv;
+            java.math.BigDecimal d = rec.getBigDecimal(pos);
+          } else if (vv.getClass().equals(NullableDecimal38DenseVector.class)) {
+            NullableDecimal38DenseVector v = (NullableDecimal38DenseVector) vv;
+            java.math.BigDecimal d = rec.getBigDecimal(pos);
+          } else {
+            NullableVarCharVector v = (NullableVarCharVector) vv;
+            v.getMutator().setSafe(counter, record, 0, record.length);
+            v.getMutator().setValueLengthSafe(counter, record.length);
+          }
+          pos++;
+        }
+        pos = 1;
+        counter++;
+      }
+    } catch (SQLException e) {
+      // TODO Auto-generated catch block
+      throw new DrillRuntimeException(e);
+    }
+    //logger.info("Number of rows returned from JDBC: " + counter);
+    for (ValueVector vv : vectors) {
+      vv.getMutator().setValueCount(counter > 0 ? counter : 0);
+    }
+    return counter>0 ? counter : 0;
+  }
+
+  @Override
+  public void allocate(Map<Key, ValueVector> vectorMap)
+      throws OutOfMemoryException {
+    int prec = 0;
+    try {
+      for (ValueVector vv : vectorMap.values()) {
+        if (vv.getClass().equals(NullableVarCharVector.class)) {
+                NullableVarCharVector v = (NullableVarCharVector) vv;
+                prec = v.getField().getWidth();
+                if(prec > 0) {
+                   AllocationHelper.allocate(v, 65536, prec);
+                } else {
+                   AllocationHelper.allocate(v, 65536, 2000);
+                }
+              } else if (vv.getClass().equals(VarCharVector.class)) {
+                VarCharVector v = (VarCharVector) vv;
+                prec = v.getField().getWidth();
+                if(prec > 0) {
+                    AllocationHelper.allocate(v, 65536, prec);
+                 } else {
+                    AllocationHelper.allocate(v, 65536, 2000);
+                 }
+              } else if (vv.getClass().equals(BigIntVector.class)) {
+                BigIntVector v = (BigIntVector) vv;
+                v.allocateNew(65536);
+              } else if (vv.getClass().equals(NullableBigIntVector.class)) {
+                NullableBigIntVector v = (NullableBigIntVector) vv;
+                v.allocateNew(65536);
+              } else if (vv.getClass().equals(DateVector.class)) {
+                DateVector v = (DateVector) vv;
+                v.allocateNew(65536);
+              } else if (vv.getClass().equals(NullableDateVector.class)) {
+                NullableDateVector v = (NullableDateVector) vv;
+                v.allocateNew(65536);
+              } else if (vv.getClass().equals(Decimal38DenseVector.class)) {
+                Decimal38DenseVector v = (Decimal38DenseVector) vv;
+                v.allocateNew(65536);
+              } else if (vv.getClass().equals(NullableDecimal38DenseVector.class)) {
+                NullableDecimal38DenseVector v = (NullableDecimal38DenseVector) vv;
+                v.allocateNew(65536);
+              } else if (vv.getClass().equals(IntVector.class)) {
+                IntVector v = (IntVector) vv;
+                v.allocateNew(65536);
+              } else if (vv.getClass().equals(NullableIntVector.class)) {
+                NullableIntVector v = (NullableIntVector) vv;
+                v.allocateNew(65536);
+              }
+      }
+    } catch (NullPointerException e) {
+      throw new OutOfMemoryException();
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    // TODO Auto-generated method stub
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java
new file mode 100644
index 0000000..fae0e81
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcScanSpec.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class MPJdbcScanSpec {
+
+  private String database;
+  private String table;
+  private String columns;
+
+  @JsonIgnore
+  private List<String> filters;
+
+  @JsonCreator
+  public MPJdbcScanSpec(@JsonProperty("database") String database,
+      @JsonProperty("table") String table, @JsonProperty("columns") String columns) {
+    this.database = database;
+    this.table = table;
+    this.columns = columns;
+  }
+
+  public MPJdbcScanSpec(String database, String table, List<String> filters, String columns) {
+    this.database = database;
+    this.table = table;
+    this.filters = filters;
+    this.columns = columns;
+  }
+
+  public String getDatabase() {
+    return this.database;
+  }
+
+  public String getTable() {
+    return this.table;
+  }
+
+  public List<String> getFilters() {
+    return this.filters;
+  }
+
+  public String getColumns() {
+    return this.columns;
+  }
+  @Override
+  public String toString() {
+    return "MPJdbcScanSpec [Database=" + database + ", table=" + table
+        + ", columns=" + columns + ", filters=" + filters + "]";
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // TODO Auto-generated method stub
+    return super.equals(obj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java
new file mode 100644
index 0000000..f3169fc
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaConfig.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.mpjdbc;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Stores the workspace related config. A workspace has: - location which is a
+ * path. - writable flag to indicate whether the location supports creating new
+ * tables. - default storage format for new tables created in this workspace.
+ */
+
+public class MPJdbcSchemaConfig {
+
+  /** Default workspace is a root directory which supports read, but not write. */
+  public static final MPJdbcSchemaConfig DEFAULT = new MPJdbcSchemaConfig("jdbc://", "",
+      "");
+
+  private final String uri;
+  private final String username;
+  private final String passwd;
+
+  public MPJdbcSchemaConfig(@JsonProperty("uri") String uri,
+      @JsonProperty("username") String username,
+      @JsonProperty("passwd") String passwd) {
+    this.uri = uri;
+    this.username = username;
+    this.passwd = passwd;
+  }
+
+  public String getUri() {
+    return uri;
+  }
+
+  public boolean isWritable() {
+    return false;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public String getPassword() {
+    return passwd;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+
+    if (obj == null || !(obj instanceof MPJdbcSchemaConfig)) {
+      return false;
+    }
+
+    MPJdbcSchemaConfig that = (MPJdbcSchemaConfig) obj;
+    return ((this.uri == null && that.uri == null) || this.uri.equals(that.uri))
+        && this.passwd == that.passwd
+        && ((this.username == null && that.username == null) || this.username
+            .equals(that.username));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java
new file mode 100644
index 0000000..0e28c8d
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaFilter.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+public class MPJdbcSchemaFilter {
+public MPJdbcSchemaFilter() {
+// TODO Auto-generated constructor stub
+}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/8478e9fb/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java
new file mode 100644
index 0000000..8c92533
--- /dev/null
+++ b/contrib/storage-mpjdbc/src/main/java/org/apache/drill/exec/store/mpjdbc/MPJdbcSchemaSubScan.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mpjdbc;
+import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.store.mpjdbc.MPJdbcSchemaFilter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+public class MPJdbcSchemaSubScan extends AbstractSubScan {
+private final String table;
+private final MPJdbcSchemaFilter filter;
+private final String userName;
+
+  @JsonCreator
+  public MPJdbcSchemaSubScan(@JsonProperty("userName") String userName,
+       @JsonProperty("table") String table,
+       @JsonProperty("filter") MPJdbcSchemaFilter filter) {
+    super(userName);
+    this.table = table;
+    this.filter = filter;
+    this.userName = userName;
+  }
+  @JsonProperty("table")
+  public String getTable() {
+    return table;
+  }
+  @JsonProperty("filter")
+  public MPJdbcSchemaFilter getFilter() {
+    return filter;
+  }
+  @JsonProperty("userName")
+  public String getUserName() {
+    return this.userName;
+  }
+@Override
+public int getOperatorType() {
+// TODO Auto-generated method stub
+return 0;
+}
+}
\ No newline at end of file


Mime
View raw message