hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject incubator-hawq git commit: HAWQ-1108. JDBC PXF Plugin
Date Tue, 14 Mar 2017 21:15:12 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 9559ba1fe -> 7a22bdd78


HAWQ-1108. JDBC PXF Plugin


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7a22bdd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7a22bdd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7a22bdd7

Branch: refs/heads/master
Commit: 7a22bdd78f78de85c4d6569c49f60868264d0398
Parents: 9559ba1
Author: Devin Jia <jiadx@inspur.com>
Authored: Tue Mar 14 14:14:11 2017 -0700
Committer: shivzone <shivram.mani@gmail.com>
Committed: Tue Mar 14 14:14:11 2017 -0700

----------------------------------------------------------------------
 pxf/build.gradle                                |  32 ++
 pxf/pxf-jdbc/README.md                          | 141 +++++++++
 .../pxf/plugins/jdbc/JdbcFilterBuilder.java     | 149 +++++++++
 .../plugins/jdbc/JdbcPartitionFragmenter.java   | 308 +++++++++++++++++++
 .../hawq/pxf/plugins/jdbc/JdbcPlugin.java       | 115 +++++++
 .../hawq/pxf/plugins/jdbc/JdbcReadAccessor.java | 122 ++++++++
 .../hawq/pxf/plugins/jdbc/JdbcReadResolver.java | 103 +++++++
 .../hawq/pxf/plugins/jdbc/WhereSQLBuilder.java  | 140 +++++++++
 .../hawq/pxf/plugins/jdbc/utils/ByteUtil.java   |  84 +++++
 .../hawq/pxf/plugins/jdbc/utils/DbProduct.java  |  49 +++
 .../pxf/plugins/jdbc/utils/MysqlProduct.java    |  31 ++
 .../pxf/plugins/jdbc/utils/OracleProduct.java   |  30 ++
 .../pxf/plugins/jdbc/utils/PostgresProduct.java |  30 ++
 .../pxf/plugins/jdbc/JdbcFilterBuilderTest.java | 101 ++++++
 .../jdbc/JdbcPartitionFragmenterTest.java       | 235 ++++++++++++++
 .../hawq/pxf/plugins/jdbc/SqlBuilderTest.java   | 175 +++++++++++
 .../src/main/resources/pxf-profiles-default.xml |   9 +
 pxf/settings.gradle                             |   3 +-
 18 files changed, 1856 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/build.gradle
----------------------------------------------------------------------
diff --git a/pxf/build.gradle b/pxf/build.gradle
index d604b96..e9d04af 100644
--- a/pxf/build.gradle
+++ b/pxf/build.gradle
@@ -437,6 +437,38 @@ project('pxf-hbase') {
     }
 }
 
+
+project('pxf-jdbc') {
+    dependencies {
+        compile(project(':pxf-api'))
+        compile(project(':pxf-service'))
+        compile "org.apache.hadoop:hadoop-common:$hadoopVersion"
+        compile "org.apache.hadoop:hadoop-hdfs:$hadoopVersion"
+        testCompile "mysql:mysql-connector-java:5.1.6"
+    }
+    tasks.withType(JavaCompile) {
+        options.encoding = "UTF-8"
+    }
+
+    ospackage {
+        packageName = versionedPackageName("${project.name}")
+        summary = 'HAWQ Extension Framework (PXF), JDBC plugin'
+        description = 'Querying external data stored in Relation Database using JDBC.'
+        packager = ' '
+        packageGroup = 'Development/Libraries'
+        release = buildNumber() + '.' + project.osFamily
+        buildHost = ' '
+
+        requires(versionedPackageName('pxf-service'), project.version, GREATER | EQUAL)
+
+        from(jar.outputs.files) {
+            into "/usr/lib/pxf-${project.version}"
+        }
+
+        link("/usr/lib/pxf-${project.version}/${project.name}.jar", "${project.name}-${project.version}.jar")
+    }
+}
+
 def buildNumber() {
     System.getenv('BUILD_NUMBER') ?: System.getProperty('user.name')
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/README.md
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/README.md b/pxf/pxf-jdbc/README.md
new file mode 100644
index 0000000..e8c4bc0
--- /dev/null
+++ b/pxf/pxf-jdbc/README.md
@@ -0,0 +1,141 @@
+# Accessing Jdbc Table Data
+
+The PXF JDBC plug-in reads data stored in Traditional relational database,ie : mysql,ORACLE,postgresql.
+
+PXF-JDBC plug-in is the client of the database, the host running the database engine does not need to
+deploy PXF.
+
+
+# Prerequisites
+
+Check the following before using PXF to access JDBC Table:
+* The PXF JDBC plug-in is installed on all cluster nodes.
+* The JDBC JAR files are installed on all cluster nodes, and added to file - 'pxf-public.classpath'
+* You have tested PXF on HDFS.
+
+# Using PXF Tables to Query JDBC Table
+Jdbc tables are defined in same schema in PXF.The PXF table has the same column name
+as Jdbc Table, and the column type requires a mapping of Jdbc-HAWQ.
+
+## Syntax Example
+The following PXF table definition is valid for Jdbc Table.
+
+    CREATE [READABLE|WRITABLE] EXTERNAL TABLE table_name
+        ( column_name data_type [, ...] | LIKE other_table )
+    LOCATION ('pxf://namenode[:port]/jdbc-schema-name.jdbc-table-name?<pxf-parameters><&custom-parameters>')
+    FORMAT 'CUSTOM' (formatter='pxfwritable_import')
+If `jdbc-schema-name` is omitted, pxf will default to the `default` schema.
+
+The `column_name` must exists in jdbc-table,`data_type` equals or similar to
+the jdbc-column type.
+
+where `<pxf-parameters>` is:
+
+    [FRAGMENTER=org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter
+    &ACCESSOR=org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor
+    &RESOLVER=org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver]
+    | PROFILE=Jdbc
+
+where `<custom-parameters>` is:
+
+    JDBC_DRIVER=<jdbc-driver-class-name>
+     &DB_URL=<jdbc-url>&USER=<database-user>&PASS=<password>
+
+## Jdbc Table to HAWQ Data Type Mapping
+Jdbc-table and hawq-table data type system is similar to, does not require
+a special type of mapping.
+# Usage
+The following to mysql, for example, describes the use of PDF-JDBC.
+
+To query MySQL Table in HAWQ, perform the following steps:
+1. create Table in MySQL
+
+         mysql> use demodb;
+         mysql> create table myclass(
+                 id int(4) not null primary key,
+                 name varchar(20) not null,
+                 gender int(4) not null default '0',
+                 degree double(16,2));`
+2. insert test data
+
+        insert into myclass values(1,"tom",1,90);
+        insert into myclass values(2,'john',0,94);
+        insert into myclass values(3,'simon',1,79);
+3. copy mysql-jdbc jar files to `/usr/lib/pxf` (on all cluster nodes), and
+edit `/etc/pxf/conf/pxf-public.classpath` , add :
+
+        /usr/lib/pxf/mysql-connector-java-*.jar
+
+     Restart all pxf-engine.
+
+4. create Table in HAWQ:
+
+        gpadmin=# CREATE EXTERNAL TABLE myclass(id integer,
+             name text,
+             gender integer,
+             degree float8)
+             LOCATION ('pxf://localhost:51200/demodb.myclass'
+                     '?PROFILE=JDBC'
+                     '&JDBC_DRIVER=com.mysql.jdbc.Driver'
+                     '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
+                     )
+            FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
+
+MySQL instance IP: 192.168.200.6, port: 3306.
+
+5. query mysql data in HAWQ:
+
+        gpadmin=# select * from myclass;
+        gpadmin=# select * from myclass where id=2;
+
+# Jdbc Table Fragments
+## intro
+PXF-JDBC plug-in as a  client to access jdbc database.By default, there is
+only one pxf-instance connectied JDBC Table.If the jdbc table data is large,
+you can also use multiple pxf-instance to access the JDBC table by fragments.
+
+## Syntax
+where `<custom-parameters>` can use following partition parameters:
+
+    PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]
+The `PARTITION_BY` parameter indicates which  column to use as the partition column.
+It can be split by colon(':'),the `column_type` current supported : `date|int|enum` .
+The Date format is `yyyy-MM-dd`.
+The `PARTITION_BY` parameter can be null, and there will be only one fragment.
+
+The `RANGE` parameter indicates the range of data to be queried , it can be split by colon(':').
+ The range is left-closed, ie: `>= start_value AND < end_value` .
+ If the `column_type` is `int`, the `end_value` can be empty.
+ If the `column_type` is `enum`,the parameter `RANGE` can be empty.
+
+The `INTERVAL` parameter can be split by colon(':'), indicate the interval
+ value of one fragment. When `column_type` is `date`,this parameter must
+ be split by colon, and `interval_unit` can be `year|month|day`. When
+ `column_type` is int, the `interval_unit` can be empty. When `column_type`
+ is enum,the `INTERVAL` parameter can be empty.
+
+The syntax examples is :
+
+    * PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'
+    * PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1
+    * PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad
+
+## Usage
+MySQL Table:
+
+    CREATE TABLE sales (id int primary key, cdate date, amt decimal(10,2),grade varchar(30))
+HAWQ Table:
+
+    CREATE EXTERNAL TABLE sales(id integer,
+                 cdate date,
+                 amt float8,
+                 grade text)
+                 LOCATION ('pxf://localhost:51200/sales'
+                         '?PROFILE=JDBC'
+                         '&JDBC_DRIVER=com.mysql.jdbc.Driver'
+                         '&DB_URL=jdbc:mysql://192.168.200.6:3306/demodb&USER=root&PASS=root'
+                         '&PARTITION_BY=cdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:year'
+                         )
+                 FORMAT 'CUSTOM' (Formatter='pxfwritable_import');
+At PXF-JDBC plugin,this will generate 2 fragments.Then HAWQ assign these fragments to 2 PXF-instance
+to access jdbc table data.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
new file mode 100644
index 0000000..3c56ccb
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilder.java
@@ -0,0 +1,149 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.BasicFilter;
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.LogicalFilter;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Uses the filter parser code to build a filter object, either simple - a
+ * single {@link BasicFilter} object or a
+ * compound - a {@link List} of
+ * {@link BasicFilter} objects.
+ * The subclass {@link WhereSQLBuilder} will use the filter for
+ * generate WHERE statement.
+ */
+public class JdbcFilterBuilder implements FilterParser.FilterBuilder {
+    /**
+     * Translates a filterString into a {@link BasicFilter} or a
+     * list of such filters.
+     *
+     * @param filterString the string representation of the filter
+     * @return a single {@link BasicFilter}
+     *         object or a {@link List} of
+     *         {@link BasicFilter} objects.
+     * @throws Exception if parsing the filter failed or filter is not a basic
+     *             filter or list of basic filters
+     */
+    public Object getFilterObject(String filterString) throws Exception {
+        FilterParser parser = new FilterParser(this);
+        Object result = parser.parse(filterString.getBytes(FilterParser.DEFAULT_CHARSET));
+
+        if (!(result instanceof LogicalFilter) && !(result instanceof BasicFilter)
+                && !(result instanceof List)) {
+            throw new Exception("String " + filterString
+                    + " resolved to no filter");
+        }
+
+        return result;
+    }
+
+
+    @Override
+    public Object build(FilterParser.LogicalOperation op, Object leftOperand, Object rightOperand) {
+        return handleLogicalOperation(op, leftOperand, rightOperand);
+    }
+
+    @Override
+    public Object build(FilterParser.LogicalOperation op, Object filter) {
+        return handleLogicalOperation(op, filter);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Object build(FilterParser.Operation opId, Object leftOperand,
+                        Object rightOperand) throws Exception {
+        // Assume column is on the left
+        return handleSimpleOperations(opId,
+                (FilterParser.ColumnIndex) leftOperand,
+                (FilterParser.Constant) rightOperand);
+    }
+
+    @Override
+    public Object build(FilterParser.Operation operation, Object operand) throws Exception {
+        if (operation == FilterParser.Operation.HDOP_IS_NULL || operation == FilterParser.Operation.HDOP_IS_NOT_NULL) {
+            // use null for the constant value of null comparison
+            return handleSimpleOperations(operation, (FilterParser.ColumnIndex) operand, null);
+        } else {
+            throw new Exception("Unsupported unary operation " + operation);
+        }
+    }
+
+    /*
+     * Handles simple column-operator-constant expressions Creates a special
+     * filter in the case the column is the row key column
+     */
+    private BasicFilter handleSimpleOperations(FilterParser.Operation opId,
+                                               FilterParser.ColumnIndex column,
+                                               FilterParser.Constant constant) {
+        return new BasicFilter(opId, column, constant);
+    }
+
+    /**
+     * Handles AND of already calculated expressions. Currently only AND, in the
+     * future OR can be added
+     *
+     * Four cases here:
+     * <ol>
+     * <li>both are simple filters</li>
+     * <li>left is a FilterList and right is a filter</li>
+     * <li>left is a filter and right is a FilterList</li>
+     * <li>both are FilterLists</li>
+     * </ol>
+     * Currently, 1, 2 can occur, since no parenthesis are used
+     *
+     * @param left left hand filter
+     * @param right right hand filter
+     * @return list of filters constructing the filter tree
+     */
+    private List<BasicFilter> handleCompoundOperations(List<BasicFilter> left,
+                                                       BasicFilter right) {
+        left.add(right);
+        return left;
+    }
+
+    private List<BasicFilter> handleCompoundOperations(BasicFilter left,
+                                                       BasicFilter right) {
+        List<BasicFilter> result = new LinkedList<>();
+
+        result.add(left);
+        result.add(right);
+
+        return result;
+    }
+
+    private Object handleLogicalOperation(FilterParser.LogicalOperation operator, Object leftOperand, Object rightOperand) {
+
+        List<Object> result = new LinkedList<>();
+
+        result.add(leftOperand);
+        result.add(rightOperand);
+        return new LogicalFilter(operator, result);
+    }
+
+    private Object handleLogicalOperation(FilterParser.LogicalOperation operator, Object filter) {
+        return new LogicalFilter(operator, Arrays.asList(filter));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
new file mode 100644
index 0000000..8b5886d
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenter.java
@@ -0,0 +1,308 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
+import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Fragmenter class for JDBC data resources.
+ *
+ * Extends the {@link Fragmenter} abstract class, with the purpose of transforming
+ * an input data path  (an JDBC Database table name  and user request parameters)  into a list of regions
+ * that belong to this table.
+ * <br>
+ * The parameter Patterns<br>
+ * There are three  parameters,  the format is as follows:<br>
+ * <pre>
+ * <code>PARTITION_BY=column_name:column_type&amp;RANGE=start_value[:end_value]&amp;INTERVAL=interval_num[:interval_unit]</code>
+ * </pre>
+ * The <code>PARTITION_BY</code> parameter can be split by colon(':'),the <code>column_type</code> current supported : <code>date,int,enum</code> .
+ * The Date format is 'yyyy-MM-dd'. <br>
+ * The <code>RANGE</code> parameter can be split by colon(':') ,used to identify the starting range of each fragment.
+ * The range is left-closed, ie:<code> '&gt;= start_value AND &lt; end_value' </code>.If the <code>column_type</code> is <code>int</code>,
+ * the <code>end_value</code> can be empty. If the <code>column_type</code>is <code>enum</code>,the parameter <code>RANGE</code> can be empty. <br>
+ * The <code>INTERVAL</code> parameter can be split by colon(':'), indicate the interval value of one fragment.
+ * When <code>column_type</code> is <code>date</code>,this parameter must be split by colon, and <code>interval_unit</code> can be <code>year,month,day</code>.
+ * When <code>column_type</code> is <code>int</code>, the <code>interval_unit</code> can be empty.
+ * When <code>column_type</code> is <code>enum</code>,the <code>INTERVAL</code> parameter can be empty.
+ * <br>
+ * <p>
+ * The syntax examples is :<br>
+ * <code>PARTITION_BY=createdate:date&amp;RANGE=2008-01-01:2010-01-01&amp;INTERVAL=1:month'</code> <br>
+ * <code>PARTITION_BY=year:int&amp;RANGE=2008:2010&amp;INTERVAL=1</code> <br>
+ * <code>PARTITION_BY=grade:enum&amp;RANGE=excellent:good:general:bad</code>
+ * </p>
+ *
+ */
+public class JdbcPartitionFragmenter extends Fragmenter {
+    String[] partitionBy = null;
+    String[] range = null;
+    String[] interval = null;
+    PartitionType partitionType = null;
+    String partitionColumn = null;
+    IntervalType intervalType = null;
+    int intervalNum = 1;
+
+    //when partitionType is DATE,it is valid
+    Calendar rangeStart = null;
+    Calendar rangeEnd = null;
+
+
+    enum PartitionType {
+        DATE,
+        INT,
+        ENUM;
+
+        public static PartitionType getType(String str) {
+            return valueOf(str.toUpperCase());
+        }
+    }
+
+    enum IntervalType {
+        DAY,
+        MONTH,
+        YEAR;
+
+        public static IntervalType type(String str) {
+            return valueOf(str.toUpperCase());
+        }
+    }
+
+    /**
+     * Constructor for JdbcPartitionFragmenter.
+     *
+     * @param inConf input data such as which Jdbc table to scan
+     * @throws UserDataException  if the request parameter is malformed
+     */
+    public JdbcPartitionFragmenter(InputData inConf) throws UserDataException {
+        super(inConf);
+        if (inConf.getUserProperty("PARTITION_BY") == null)
+            return;
+        try {
+            partitionBy = inConf.getUserProperty("PARTITION_BY").split(":");
+            partitionColumn = partitionBy[0];
+            partitionType = PartitionType.getType(partitionBy[1]);
+        } catch (IllegalArgumentException e1) {
+            throw new UserDataException("The parameter 'PARTITION_BY' invalid, the pattern is 'column_name:date|int|enum'");
+        }
+
+        try {
+            range = inConf.getUserProperty("RANGE").split(":");
+        } catch (IllegalArgumentException e1) {
+            throw new UserDataException("The parameter 'RANGE' invalid, the pattern is 'start_value[:end_value]'");
+        }
+        try {
+            //parse and validate parameter-INTERVAL
+            if (inConf.getUserProperty("INTERVAL") != null) {
+                interval = inConf.getUserProperty("INTERVAL").split(":");
+                intervalNum = Integer.parseInt(interval[0]);
+                if (interval.length > 1)
+                    intervalType = IntervalType.type(interval[1]);
+            }
+            if (intervalNum < 1)
+                throw new UserDataException("The parameter 'INTERVAL' must > 1, but actual is '" + intervalNum + "'");
+        } catch (IllegalArgumentException e1) {
+            throw new UserDataException("The parameter 'INTERVAL' invalid, the pattern is 'interval_num[:interval_unit]'");
+        } catch (UserDataException e2) {
+            throw e2;
+        }
+        try {
+            if (partitionType == PartitionType.DATE) {
+                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+                rangeStart = Calendar.getInstance();
+                rangeStart.setTime(df.parse(range[0]));
+                rangeEnd = Calendar.getInstance();
+                rangeEnd.setTime(df.parse(range[1]));
+            }
+        } catch (ParseException e) {
+            throw new UserDataException("The parameter 'RANGE' include invalid date format.");
+        }
+    }
+
+    /**
+     * Returns statistics for Jdbc table. Currently it's not implemented.
+     * @throws UnsupportedOperationException ANALYZE for Jdbc plugin is not supported
+     */
+    @Override
+    public FragmentsStats getFragmentsStats() throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("ANALYZE for Jdbc plugin is not supported");
+    }
+
+    /**
+     * Returns list of fragments containing all of the
+     * Jdbc table data.
+     *
+     * @return a list of fragments
+     * @throws Exception if assign host error
+     */
+    @Override
+    public List<Fragment> getFragments() throws Exception {
+        if (partitionType == null) {
+            byte[] fragmentMetadata = null;
+            byte[] userData = null;
+            Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+            fragments.add(fragment);
+            return prepareHosts(fragments);
+        }
+        switch (partitionType) {
+            case DATE: {
+                int currInterval = intervalNum;
+
+                Calendar fragStart = rangeStart;
+                while (fragStart.before(rangeEnd)) {
+                    Calendar fragEnd = (Calendar) fragStart.clone();
+                    switch (intervalType) {
+                        case DAY:
+                            fragEnd.add(Calendar.DAY_OF_MONTH, currInterval);
+                            break;
+                        case MONTH:
+                            fragEnd.add(Calendar.MONTH, currInterval);
+                            break;
+                        case YEAR:
+                            fragEnd.add(Calendar.YEAR, currInterval);
+                            break;
+                    }
+                    if (fragEnd.after(rangeEnd))
+                        fragEnd = (Calendar) rangeEnd.clone();
+
+                    //make metadata of this fragment , converts the date to a millisecond,then get bytes.
+                    byte[] msStart = ByteUtil.getBytes(fragStart.getTimeInMillis());
+                    byte[] msEnd = ByteUtil.getBytes(fragEnd.getTimeInMillis());
+                    byte[] fragmentMetadata = ByteUtil.mergeBytes(msStart, msEnd);
+
+                    byte[] userData = new byte[0];
+                    Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+                    fragments.add(fragment);
+
+                    //continue next fragment.
+                    fragStart = fragEnd;
+                }
+                break;
+            }
+            case INT: {
+                int rangeStart = Integer.parseInt(range[0]);
+                int rangeEnd = Integer.parseInt(range[1]);
+                int currInterval = intervalNum;
+
+                //validate : curr_interval > 0
+                int fragStart = rangeStart;
+                while (fragStart < rangeEnd) {
+                    int fragEnd = fragStart + currInterval;
+                    if (fragEnd > rangeEnd) fragEnd = rangeEnd;
+
+                    byte[] bStart = ByteUtil.getBytes(fragStart);
+                    byte[] bEnd = ByteUtil.getBytes(fragEnd);
+                    byte[] fragmentMetadata = ByteUtil.mergeBytes(bStart, bEnd);
+
+                    byte[] userData = new byte[0];
+                    Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, userData);
+                    fragments.add(fragment);
+
+                    //continue next fragment.
+                    fragStart = fragEnd;// + 1;
+                }
+                break;
+            }
+            case ENUM:
+                for (String frag : range) {
+                    byte[] fragmentMetadata = frag.getBytes();
+                    Fragment fragment = new Fragment(inputData.getDataSource(), null, fragmentMetadata, new byte[0]);
+                    fragments.add(fragment);
+                }
+                break;
+        }
+
+        return prepareHosts(fragments);
+    }
+
+    /**
+     * For each fragment , assigned a host address.
+     * In Jdbc Plugin, 'replicas' is the host address of the PXF engine that is running, not the database engine.
+     * Since the other PXF host addresses can not be probed, only the host name of the current PXF engine is returned.
+     * @param fragments a list of fragments
+     * @return a list of fragments that assigned hosts.
+     * @throws UnknownHostException if InetAddress.getLocalHost error.
+     */
+    public static List<Fragment> prepareHosts(List<Fragment> fragments) throws UnknownHostException {
+        for (Fragment fragment : fragments) {
+            String pxfHost = InetAddress.getLocalHost().getHostAddress();
+            String[] hosts = new String[]{pxfHost};
+            fragment.setReplicas(hosts);
+        }
+
+        return fragments;
+    }
+
+    public String buildFragmenterSql(String dbName, String originSql) {
+        byte[] meta = inputData.getFragmentMetadata();
+        if (meta == null)
+            return originSql;
+
+        DbProduct dbProduct = DbProduct.getDbProduct(dbName);
+
+        StringBuilder sb = new StringBuilder(originSql);
+        if (!originSql.contains("WHERE"))
+            sb.append(" WHERE 1=1 ");
+
+        sb.append(" AND ");
+        switch (partitionType) {
+            case DATE: {
+                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
+                //parse metadata of this fragment
+                //validate:the length of metadata == 16 (long)
+                byte[][] newb = ByteUtil.splitBytes(meta, 8);
+                Date fragStart = new Date(ByteUtil.toLong(newb[0]));
+                Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
+
+                sb.append(partitionColumn).append(" >= ").append(dbProduct.wrapDate(df.format(fragStart)));
+                sb.append(" AND ");
+                sb.append(partitionColumn).append(" < ").append(dbProduct.wrapDate(df.format(fragEnd)));
+
+                break;
+            }
+            case INT: {
+                //validate:the length of metadata ==8 (int)
+                byte[][] newb = ByteUtil.splitBytes(meta, 4);
+                int fragStart = ByteUtil.toInt(newb[0]);
+                int fragEnd = ByteUtil.toInt(newb[1]);
+                sb.append(partitionColumn).append(" >= ").append(fragStart);
+                sb.append(" AND ");
+                sb.append(partitionColumn).append(" < ").append(fragEnd);
+                break;
+            }
+            case ENUM:
+                sb.append(partitionColumn).append("='").append(new String(meta)).append("'");
+                break;
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
new file mode 100644
index 0000000..53848f1
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPlugin.java
@@ -0,0 +1,115 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+
+import java.sql.*;
+
+/**
+ * This class resolves the jdbc connection parameter and manages the opening and closing of the jdbc connection.
+ * Implemented subclasses: {@link JdbcReadAccessor}.
+ *
+ */
+public class JdbcPlugin extends Plugin {
+    private static final Log LOG = LogFactory.getLog(JdbcPlugin.class);
+
+    //jdbc connection parameters
+    protected String jdbcDriver = null;
+    protected String dbUrl = null;
+    protected String user = null;
+    protected String pass = null;
+    protected String tblName = null;
+    protected int batchSize = 100;
+
+    //jdbc connection
+    protected Connection dbConn = null;
+    //database type,from DatabaseMetaData.getDatabaseProductName()
+    protected String dbProduct = null;
+
+    /**
+     * parse input data
+     *
+     * @param input the input data
+     * @throws UserDataException if the request parameter is malformed
+     */
+    public JdbcPlugin(InputData input) throws UserDataException {
+        super(input);
+        jdbcDriver = input.getUserProperty("JDBC_DRIVER");
+        dbUrl = input.getUserProperty("DB_URL");
+        user = input.getUserProperty("USER");
+        pass = input.getUserProperty("PASS");
+        String strBatch = input.getUserProperty("BATCH_SIZE");
+        if (strBatch != null) {
+            batchSize = Integer.parseInt(strBatch);
+        }
+
+        if (jdbcDriver == null) {
+            throw new UserDataException("JDBC_DRIVER must be set");
+        }
+        if (dbUrl == null) {
+            throw new UserDataException("DB_URL must be set(read)");
+        }
+
+        tblName = input.getDataSource();
+        if (tblName == null) {
+            throw new UserDataException("TABLE_NAME must be set as DataSource.");
+        } else {
+            tblName = tblName.toUpperCase();
+        }
+    }
+
+    public String getTableName() {
+        return tblName;
+    }
+
+    protected Connection openConnection() throws ClassNotFoundException, SQLException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Open JDBC: driver=%s,url=%s,user=%s,pass=%s,table=%s",
+                    jdbcDriver, dbUrl, user, pass, tblName));
+        }
+        if (dbConn == null || dbConn.isClosed()) {
+            Class.forName(jdbcDriver);
+            if (user != null) {
+                dbConn = DriverManager.getConnection(dbUrl, user, pass);
+            } else {
+                dbConn = DriverManager.getConnection(dbUrl);
+            }
+            DatabaseMetaData meta = dbConn.getMetaData();
+            dbProduct = meta.getDatabaseProductName();
+        }
+        return dbConn;
+    }
+
+    protected void closeConnection() {
+        try {
+            if (dbConn != null) {
+                dbConn.close();
+                dbConn = null;
+            }
+        } catch (SQLException e) {
+            LOG.error("Close db connection error . ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
new file mode 100644
index 0000000..2ca9a94
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadAccessor.java
@@ -0,0 +1,122 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.sql.*;
+import java.util.ArrayList;
+
+/**
+ * Accessor for Jdbc tables. The accessor will open and read a partition belonging
+ * to a Jdbc table. JdbcReadAccessor generates and executes SQL from filter and
+ * fragmented information, uses {@link JdbcReadResolver } to read the ResultSet, and generates
+ * the data type - List {@link OneRow} that HAWQ needs.
+ */
+public class JdbcReadAccessor extends JdbcPlugin implements ReadAccessor {
+    private static final Log LOG = LogFactory.getLog(JdbcReadAccessor.class);
+
+    WhereSQLBuilder filterBuilder = null;
+    private ColumnDescriptor keyColumn = null;
+
+    private String querySql = null;
+    private Statement statement = null;
+    private ResultSet resultSet = null;
+
+    public JdbcReadAccessor(InputData input) throws UserDataException {
+        super(input);
+        filterBuilder = new WhereSQLBuilder(inputData);
+
+        //buid select statement (not contain where statement)
+        ArrayList<ColumnDescriptor> columns = input.getTupleDescription();
+        StringBuilder sb = new StringBuilder();
+        sb.append("SELECT ");
+        for (int i = 0; i < columns.size(); i++) {
+            ColumnDescriptor column = columns.get(i);
+            if (column.isKeyColumn())
+                keyColumn = column;
+            if (i > 0) sb.append(",");
+            sb.append(column.columnName());
+        }
+        sb.append(" FROM ").append(getTableName());
+        querySql = sb.toString();
+    }
+
+    /**
+     * open db connection, execute query sql
+     */
+    @Override
+    public boolean openForRead() throws Exception {
+        if (statement != null && !statement.isClosed())
+            return true;
+        super.openConnection();
+
+        statement = dbConn.createStatement();
+
+        resultSet = executeQuery(querySql);
+
+        return true;
+    }
+
+    public ResultSet executeQuery(String sql) throws Exception {
+        String query = sql;
+        if (inputData.hasFilter()) {
+            //parse filter string , build where statement
+            String whereSql = filterBuilder.buildWhereSQL(dbProduct);
+
+            if (whereSql != null) {
+                query = query + " WHERE " + whereSql;
+            }
+        }
+
+        //according to the fragment information, rewriting sql
+        JdbcPartitionFragmenter fragmenter = new JdbcPartitionFragmenter(inputData);
+        query = fragmenter.buildFragmenterSql(dbProduct, query);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("executeQuery: " + query);
+        }
+
+        return statement.executeQuery(query);
+    }
+
+    @Override
+    public OneRow readNextObject() throws Exception {
+        if (resultSet.next()) {
+            return new OneRow(null, resultSet);
+        }
+        return null;
+    }
+
+    @Override
+    public void closeForRead() throws Exception {
+        if (statement != null && !statement.isClosed()) {
+            statement.close();
+            statement = null;
+        }
+        super.closeConnection();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
new file mode 100644
index 0000000..1c61537
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcReadResolver.java
@@ -0,0 +1,103 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Class JdbcReadResolver Read the Jdbc ResultSet, and generates the data type - List {@link OneField}.
+ */
+public class JdbcReadResolver extends Plugin implements ReadResolver {
+    private static final Log LOG = LogFactory.getLog(JdbcReadResolver.class);
+    //HAWQ Table column definitions
+    private ArrayList<ColumnDescriptor> columns = null;
+
+    public JdbcReadResolver(InputData input) {
+        super(input);
+        columns = input.getTupleDescription();
+    }
+
+    @Override
+    public List<OneField> getFields(OneRow row) throws Exception {
+        ResultSet result = (ResultSet) row.getData();
+        LinkedList<OneField> fields = new LinkedList<>();
+
+        for (int i = 0; i < columns.size(); i++) {
+            ColumnDescriptor column = columns.get(i);
+            String colName = column.columnName();
+            Object value = null;
+
+            OneField oneField = new OneField();
+            oneField.type = column.columnTypeCode();
+
+            switch (DataType.get(oneField.type)) {
+                case INTEGER:
+                    value = result.getInt(colName);
+                    break;
+                case FLOAT8:
+                    value = result.getDouble(colName);
+                    break;
+                case REAL:
+                    value = result.getFloat(colName);
+                    break;
+                case BIGINT:
+                    value = result.getLong(colName);
+                    break;
+                case SMALLINT:
+                    value = result.getShort(colName);
+                    break;
+                case BOOLEAN:
+                    value = result.getBoolean(colName);
+                    break;
+                case BYTEA:
+                    value = result.getBytes(colName);
+                    break;
+                case VARCHAR:
+                case BPCHAR:
+                case TEXT:
+                case NUMERIC:
+                    value = result.getString(colName);
+                    break;
+                case TIMESTAMP:
+                case DATE:
+                    value = result.getDate(colName);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unknwon Field Type : " + DataType.get(oneField.type).toString()
+                            + ", Column : " + column.toString());
+            }
+            oneField.val = value;
+            fields.add(oneField);
+        }
+        return fields;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
new file mode 100644
index 0000000..541aa86
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/WhereSQLBuilder.java
@@ -0,0 +1,140 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hawq.pxf.api.LogicalFilter;
+import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
+import org.apache.hawq.pxf.api.BasicFilter;
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+/**
+ * Parse filter object generated by parent class  {@link org.apache.hawq.pxf.plugins.jdbc.JdbcFilterBuilder},
+ * and build WHERE statement.
+ * For Multiple filters , currently only support HDOP_AND .
+ * The unsupported Filter operation and  LogicalOperation ,will return null statement.
+ *
+ */
+public class WhereSQLBuilder extends JdbcFilterBuilder {
+    private InputData inputData;
+
+    public WhereSQLBuilder(InputData input) {
+        inputData = input;
+    }
+
+    /**
+     * 1.check for LogicalOperator, Jdbc currently only support HDOP_AND.
+     * 2.and convert to BasicFilter List.
+     */
+    private static List<BasicFilter> convertBasicFilterList(Object filter, List<BasicFilter> returnList) throws UnsupportedFilterException {
+        if (returnList == null)
+            returnList = new ArrayList<>();
+        if (filter instanceof BasicFilter) {
+            returnList.add((BasicFilter) filter);
+            return returnList;
+        }
+        LogicalFilter lfilter = (LogicalFilter) filter;
+        if (lfilter.getOperator() != FilterParser.LogicalOperation.HDOP_AND)
+            throw new UnsupportedFilterException("unsupported LogicalOperation : " + lfilter.getOperator());
+        for (Object f : lfilter.getFilterList()) {
+            returnList = convertBasicFilterList(f, returnList);
+        }
+        return returnList;
+    }
+
+    public String buildWhereSQL(String db_product) throws Exception {
+        if (!inputData.hasFilter())
+            return null;
+        List<BasicFilter> filters = null;
+        try {
+            String filterString = inputData.getFilterString();
+            Object filterObj = getFilterObject(filterString);
+
+            filters = convertBasicFilterList(filterObj, filters);
+            StringBuffer sb = new StringBuffer("1=1");
+            for (Object obj : filters) {
+                BasicFilter filter = (BasicFilter) obj;
+                sb.append(" AND ");
+
+                ColumnDescriptor column = inputData.getColumn(filter.getColumn().index());
+                //the column name of filter
+                sb.append(column.columnName());
+
+                //the operation of filter
+                FilterParser.Operation op = filter.getOperation();
+                switch (op) {
+                    case HDOP_LT:
+                        sb.append("<");
+                        break;
+                    case HDOP_GT:
+                        sb.append(">");
+                        break;
+                    case HDOP_LE:
+                        sb.append("<=");
+                        break;
+                    case HDOP_GE:
+                        sb.append(">=");
+                        break;
+                    case HDOP_EQ:
+                        sb.append("=");
+                        break;
+                    default:
+                        throw new UnsupportedFilterException("unsupported Filter operation : " + op);
+                }
+
+                DbProduct dbProduct = DbProduct.getDbProduct(db_product);
+                Object val = filter.getConstant().constant();
+                switch (DataType.get(column.columnTypeCode())) {
+                    case SMALLINT:
+                    case INTEGER:
+                    case BIGINT:
+                    case FLOAT8:
+                    case REAL:
+                    case BOOLEAN:
+                        sb.append(val.toString());
+                        break;
+                    case TEXT:
+                        sb.append("'").append(val.toString()).append("'");
+                        break;
+                    case DATE:
+                        //According to the database products, for the date field for special treatment.
+                        sb.append(dbProduct.wrapDate(val));
+                        break;
+                    default:
+                        throw new UnsupportedFilterException("unsupported column type for filtering : " + column.columnTypeCode());
+                }
+
+            }
+            return sb.toString();
+        } catch (UnsupportedFilterException ex) {
+            return null;
+        }
+    }
+
+    static class UnsupportedFilterException extends Exception {
+        UnsupportedFilterException(String message) {
+            super(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
new file mode 100644
index 0000000..cdca8a6
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/ByteUtil.java
@@ -0,0 +1,84 @@
+package org.apache.hawq.pxf.plugins.jdbc.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * A tool class, used to deal with byte array merging, split and other methods.
+ */
+public class ByteUtil {
+
+    public static byte[] mergeBytes(byte[] b1, byte[] b2) {
+        return ArrayUtils.addAll(b1,b2);
+    }
+
+    public static byte[][] splitBytes(byte[] bytes, int n) {
+        int len = bytes.length / n;
+        byte[][] newBytes = new byte[len][];
+        int j = 0;
+        for (int i = 0; i < len; i++) {
+            newBytes[i] = new byte[n];
+            for (int k = 0; k < n; k++) newBytes[i][k] = bytes[j++];
+        }
+        return newBytes;
+    }
+
+    public static byte[] getBytes(long value) {
+        byte[] b = new byte[8];
+        b[0] = (byte) ((value >> 56) & 0xFF);
+        b[1] = (byte) ((value >> 48) & 0xFF);
+        b[2] = (byte) ((value >> 40) & 0xFF);
+        b[3] = (byte) ((value >> 32) & 0xFF);
+        b[4] = (byte) ((value >> 24) & 0xFF);
+        b[5] = (byte) ((value >> 16) & 0xFF);
+        b[6] = (byte) ((value >> 8) & 0xFF);
+        b[7] = (byte) ((value >> 0) & 0xFF);
+        return b;
+    }
+
+    public static byte[] getBytes(int value) {
+        byte[] b = new byte[4];
+        b[0] = (byte) ((value >> 24) & 0xFF);
+        b[1] = (byte) ((value >> 16) & 0xFF);
+        b[2] = (byte) ((value >> 8) & 0xFF);
+        b[3] = (byte) ((value >> 0) & 0xFF);
+        return b;
+    }
+
+    public static int toInt(byte[] b) {
+        return (((((int) b[3]) & 0xFF) << 32) +
+                ((((int) b[2]) & 0xFF) << 40) +
+                ((((int) b[1]) & 0xFF) << 48) +
+                ((((int) b[0]) & 0xFF) << 56));
+    }
+
+    public static long toLong(byte[] b) {
+        return ((((long) b[7]) & 0xFF) +
+                ((((long) b[6]) & 0xFF) << 8) +
+                ((((long) b[5]) & 0xFF) << 16) +
+                ((((long) b[4]) & 0xFF) << 24) +
+                ((((long) b[3]) & 0xFF) << 32) +
+                ((((long) b[2]) & 0xFF) << 40) +
+                ((((long) b[1]) & 0xFF) << 48) +
+                ((((long) b[0]) & 0xFF) << 56));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
new file mode 100644
index 0000000..30ff1fe
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/DbProduct.java
@@ -0,0 +1,49 @@
+package org.apache.hawq.pxf.plugins.jdbc.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * As the syntax of different database products are not the same, such as the date type  field for processing, ORACLE use to_date () function, and mysql use Date () function.
+ So we create this class to abstract public methods, the specific database products can implementation of these  methods.
+ */
+public abstract class DbProduct {
+    //wrap date string
+    public abstract String wrapDate(Object date_val);
+
+
+    public static DbProduct getDbProduct(String dbName) {
+        if (dbName.toUpperCase().contains("MYSQL"))
+            return new MysqlProduct();
+        else if (dbName.toUpperCase().contains("ORACLE"))
+            return new OracleProduct();
+        else if (dbName.toUpperCase().contains("POSTGRES"))
+            return new PostgresProduct();
+        else
+            //Unsupported databases may execute errors
+            return new CommonProduct();
+    }
+}
+
+class CommonProduct extends DbProduct {
+    @Override
+    public String wrapDate(Object dateVal) {
+        return "date'" + dateVal + "'";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
new file mode 100644
index 0000000..2e60ada
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/MysqlProduct.java
@@ -0,0 +1,31 @@
+package org.apache.hawq.pxf.plugins.jdbc.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implements methods for MySQL Database.
+ */
+public class MysqlProduct extends DbProduct {
+
+    @Override
+    public String wrapDate(Object dateVal){
+        return "DATE('" + dateVal + "')";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
new file mode 100644
index 0000000..b46c5f3
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/OracleProduct.java
@@ -0,0 +1,30 @@
+package org.apache.hawq.pxf.plugins.jdbc.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implements methods for Oracle Database.
+ */
+public class OracleProduct extends DbProduct {
+    @Override
+    public String wrapDate(Object dateVal) {
+        return "to_date('" + dateVal + "','yyyy-mm-dd')";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
new file mode 100644
index 0000000..901cf2e
--- /dev/null
+++ b/pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/utils/PostgresProduct.java
@@ -0,0 +1,30 @@
+package org.apache.hawq.pxf.plugins.jdbc.utils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implements methods for Postgres Database.
+ */
+public class PostgresProduct extends DbProduct {
+    @Override
+    public String wrapDate(Object dateVal) {
+        return "date'" + dateVal + "'";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java
new file mode 100644
index 0000000..1b1191c
--- /dev/null
+++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcFilterBuilderTest.java
@@ -0,0 +1,101 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.BasicFilter;
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.FilterParser.LogicalOperation;
+import org.apache.hawq.pxf.api.LogicalFilter;
+import org.junit.Test;
+
+import static org.apache.hawq.pxf.api.FilterParser.Operation.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class JdbcFilterBuilderTest {
+    @Test
+    public void parseFilterWithThreeOperations() throws Exception {
+        //orgin sql => col_1>'2008-02-01' and col_1<'2008-12-01' or col_2 > 1200
+        String filterstr = "a1c25s10d2008-02-01o2a1c25s10d2008-12-01o1l0a2c20s4d1200o2l1";
+        JdbcFilterBuilder builder = new JdbcFilterBuilder();
+
+        LogicalFilter filterList = (LogicalFilter) builder.getFilterObject(filterstr);
+        assertEquals(LogicalOperation.HDOP_OR, filterList.getOperator());
+        LogicalFilter l1_left = (LogicalFilter) filterList.getFilterList().get(0);
+        BasicFilter l1_right = (BasicFilter) filterList.getFilterList().get(1);
+
+        //column_2 > 1200
+        assertEquals(2, l1_right.getColumn().index());
+        assertEquals(HDOP_GT, l1_right.getOperation());
+        assertEquals(1200L, l1_right.getConstant().constant());
+
+        assertEquals(LogicalOperation.HDOP_AND, l1_left.getOperator());
+        BasicFilter l2_left = (BasicFilter) l1_left.getFilterList().get(0);
+        BasicFilter l2_right = (BasicFilter) l1_left.getFilterList().get(1);
+
+        //column_1 > '2008-02-01'
+        assertEquals(1, l2_left.getColumn().index());
+        assertEquals(HDOP_GT, l2_left.getOperation());
+        assertEquals("2008-02-01", l2_left.getConstant().constant());
+
+        //column_2 < '2008-12-01'
+        assertEquals(1, l2_right.getColumn().index());
+        assertEquals(HDOP_LT, l2_right.getOperation());
+        assertEquals("2008-12-01", l2_right.getConstant().constant());
+
+    }
+
+    @Test
+    public void parseFilterWithLogicalOperation() throws Exception {
+        WhereSQLBuilder builder = new WhereSQLBuilder(null);
+        LogicalFilter filter = (LogicalFilter) builder.getFilterObject("a1c25s5dfirsto5a2c20s1d2o2l0");
+        assertEquals(LogicalOperation.HDOP_AND, filter.getOperator());
+        assertEquals(2, filter.getFilterList().size());
+    }
+
+    @Test
+    public void parseNestedExpressionWithLogicalOperation() throws Exception {
+        WhereSQLBuilder builder = new WhereSQLBuilder(null);
+        LogicalFilter filter = (LogicalFilter) builder.getFilterObject("a1c25s5dfirsto5a2c20s1d2o2l0a1c20s1d1o1l1");
+        assertEquals(LogicalOperation.HDOP_OR, filter.getOperator());
+        assertEquals(LogicalOperation.HDOP_AND, ((LogicalFilter) filter.getFilterList().get(0)).getOperator());
+        assertEquals(HDOP_LT, ((BasicFilter) filter.getFilterList().get(1)).getOperation());
+    }
+
+    @Test
+    public void parseISNULLExpression() throws Exception {
+        WhereSQLBuilder builder = new WhereSQLBuilder(null);
+        BasicFilter filter = (BasicFilter) builder.getFilterObject("a1o8");
+        assertEquals(FilterParser.Operation.HDOP_IS_NULL, filter.getOperation());
+        assertEquals(1, filter.getColumn().index());
+        assertNull(filter.getConstant());
+    }
+
+    @Test
+    public void parseISNOTNULLExpression() throws Exception {
+        WhereSQLBuilder builder = new WhereSQLBuilder(null);
+        BasicFilter filter = (BasicFilter) builder.getFilterObject("a1o9");
+        assertEquals(FilterParser.Operation.HDOP_IS_NOT_NULL, filter.getOperation());
+        assertEquals(1, filter.getColumn().index());
+        assertNull(filter.getConstant());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
new file mode 100644
index 0000000..b7a7493
--- /dev/null
+++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/JdbcPartitionFragmenterTest.java
@@ -0,0 +1,235 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.UserDataException;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil;
+import org.junit.Test;
+
+import java.text.ParseException;
+import java.util.Calendar;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class JdbcPartitionFragmenterTest {
+    InputData inputData;
+
+    @Test
+    public void testPartionByDateOfMonth() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date");
+        when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2009-01-01");
+        when(inputData.getUserProperty("INTERVAL")).thenReturn("1:month");
+
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+        assertEquals(fragments.size(), 12);
+
+        //fragment - 1
+        byte[] fragMeta = fragments.get(0).getMetadata();
+        byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 8);
+        long fragStart = ByteUtil.toLong(newBytes[0]);
+        long fragEnd = ByteUtil.toLong(newBytes[1]);
+        assertDateEquals(fragStart, 2008, 1, 1);
+        assertDateEquals(fragEnd, 2008, 2, 1);
+
+        //fragment - 12
+        fragMeta = fragments.get(11).getMetadata();
+        newBytes = ByteUtil.splitBytes(fragMeta, 8);
+        fragStart = ByteUtil.toLong(newBytes[0]);
+        fragEnd = ByteUtil.toLong(newBytes[1]);
+        assertDateEquals(fragStart, 2008, 12, 1);
+        assertDateEquals(fragEnd, 2009, 1, 1);
+
+        //when end_date > start_date
+        when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2001-01-01");
+        fragment = new JdbcPartitionFragmenter(inputData);
+        assertEquals(0, fragment.getFragments().size());
+    }
+
+    @Test
+    public void testPartionByDateOfYear() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date");
+        when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2011-01-01");
+        when(inputData.getUserProperty("INTERVAL")).thenReturn("1:year");
+
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+        assertEquals(fragments.size(), 3);
+    }
+
+    @Test
+    public void testPartionByInt() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("year:int");
+        when(inputData.getUserProperty("RANGE")).thenReturn("2001:2012");
+        when(inputData.getUserProperty("INTERVAL")).thenReturn("2");
+
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+        assertEquals(fragments.size(), 6);
+
+        //fragment - 1
+        byte[] fragMeta = fragments.get(0).getMetadata();
+        byte[][] newBytes = ByteUtil.splitBytes(fragMeta, 4);
+        int fragStart = ByteUtil.toInt(newBytes[0]);
+        int fragEnd = ByteUtil.toInt(newBytes[1]);
+        assertEquals(fragStart, 2001);
+        assertEquals(fragEnd, 2003);
+
+        //fragment - 6
+        fragMeta = fragments.get(5).getMetadata();
+        newBytes = ByteUtil.splitBytes(fragMeta, 4);
+        fragStart = ByteUtil.toInt(newBytes[0]);
+        fragEnd = ByteUtil.toInt(newBytes[1]);
+        assertEquals(fragStart, 2011);
+        assertEquals(fragEnd, 2012);
+
+        //when end > start
+        when(inputData.getUserProperty("RANGE")).thenReturn("2013:2012");
+        fragment = new JdbcPartitionFragmenter(inputData);
+        assertEquals(0, fragment.getFragments().size());
+
+    }
+
+    @Test
+    public void testPartionByEnum() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("level:enum");
+        when(inputData.getUserProperty("RANGE")).thenReturn("excellent:good:general:bad");
+
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+        assertEquals(fragments.size(), 4);
+
+        //fragment - 1
+        byte[] fragMeta = fragments.get(0).getMetadata();
+        assertEquals("excellent", new String(fragMeta));
+
+        //fragment - 4
+        fragMeta = fragments.get(3).getMetadata();
+        assertEquals("bad", new String(fragMeta));
+    }
+
+    @Test
+    public void inValidPartitiontype() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("level:float");
+        when(inputData.getUserProperty("RANGE")).thenReturn("100:200");
+
+        try {
+            new JdbcPartitionFragmenter(inputData);
+            fail("Expected an IllegalArgumentException");
+        } catch (UserDataException ex) {
+
+        }
+    }
+
+    @Test
+    public void inValidParameterFormat() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+
+        //PARTITION_BY must be comma-delimited string
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("level-enum");
+        when(inputData.getUserProperty("RANGE")).thenReturn("100:200");
+        try {
+            new JdbcPartitionFragmenter(inputData);
+            fail("Expected an ArrayIndexOutOfBoundsException");
+        } catch (ArrayIndexOutOfBoundsException ex) {
+        }
+
+        //date string must be yyyy-MM-dd
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date");
+        when(inputData.getUserProperty("RANGE")).thenReturn("2008/01/01:2009-01-01");
+        when(inputData.getUserProperty("INTERVAL")).thenReturn("1:month");
+        try {
+            JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+            fragment.getFragments();
+            fail("Expected an ParseException");
+        } catch (UserDataException ex) {
+        }
+    }
+
+    @Test
+    public void inValidParameterValue() throws Exception {
+        prepareConstruction();
+        //INTERVAL must be greater than 0
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date");
+        when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2009-01-01");
+        when(inputData.getUserProperty("INTERVAL")).thenReturn("-1:month");
+        try {
+            new JdbcPartitionFragmenter(inputData);
+            fail("Expected an UserDataException");
+        } catch (UserDataException ex) {
+        }
+    }
+
+    @Test
+    public void inValidIntervaltype() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date");
+        when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2011-01-01");
+        when(inputData.getUserProperty("INTERVAL")).thenReturn("6:hour");
+
+        try {
+            JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+            fragment.getFragments();
+            fail("Expected an UserDataException");
+        } catch (UserDataException ex) {
+        }
+    }
+
+    @Test
+    public void testNoPartition() throws Exception {
+        prepareConstruction();
+        when(inputData.getDataSource()).thenReturn("sales");
+
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+        assertEquals(fragments.size(), 1);
+    }
+
+    private void assertDateEquals(long date, int year, int month, int day) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTimeInMillis(date);
+        assertEquals(calendar.get(Calendar.YEAR), year);
+        assertEquals(calendar.get(Calendar.MONTH), month - 1);
+        assertEquals(calendar.get(Calendar.DAY_OF_MONTH), day);
+    }
+
+    private void prepareConstruction() throws Exception {
+        inputData = mock(InputData.class);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
new file mode 100644
index 0000000..ebe367d
--- /dev/null
+++ b/pxf/pxf-jdbc/src/test/java/org/apache/hawq/pxf/plugins/jdbc/SqlBuilderTest.java
@@ -0,0 +1,175 @@
+package org.apache.hawq.pxf.plugins.jdbc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Validate SQL string generated by the {@link JdbcPartitionFragmenter#buildFragmenterSql} method
+ * and the {@link WhereSQLBuilder#buildWhereSQL} method.
+ */
+public class SqlBuilderTest {
+    private static final Log LOG = LogFactory.getLog(SqlBuilderTest.class);
+    static final String DB_PRODUCT = "mysql";
+    static final String ORIGINAL_SQL = "select * from sales";
+    InputData inputData;
+
+    @Before
+    public void setup() throws Exception {
+        LOG.info("SqlBuilderTest.setup()");
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        LOG.info("SqlBuilderTest.cleanup()");
+    }
+
+    @Test
+    public void testIdFilter() throws Exception {
+        prepareConstruction();
+        when(inputData.hasFilter()).thenReturn(true);
+        when(inputData.getFilterString()).thenReturn("a0c20s1d1o5");//id=1
+
+        WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
+        assertEquals("1=1 AND id=1", builder.buildWhereSQL(DB_PRODUCT));
+    }
+
+    @Test
+    public void testDateAndAmtFilter() throws Exception {
+        prepareConstruction();
+        when(inputData.hasFilter()).thenReturn(true);
+        // cdate>'2008-02-01' and cdate<'2008-12-01' and amt > 1200
+        when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a1c25s10d2008-12-01o1l0a2c20s4d1200o2l0");
+
+        WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
+        assertEquals("1=1 AND cdate>DATE('2008-02-01') AND cdate<DATE('2008-12-01') AND amt>1200"
+                , builder.buildWhereSQL(DB_PRODUCT));
+    }
+
+    @Test
+    public void testUnsupportedOperationFilter() throws Exception {
+        prepareConstruction();
+        when(inputData.hasFilter()).thenReturn(true);
+        // grade like 'bad'
+        when(inputData.getFilterString()).thenReturn("a3c25s3dbado7");
+
+        WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
+        assertEquals(null, builder.buildWhereSQL(DB_PRODUCT));
+    }
+
+    @Test
+    public void testUnsupportedLogicalFilter() throws Exception {
+        prepareConstruction();
+        when(inputData.hasFilter()).thenReturn(true);
+        // cdate>'2008-02-01' or amt < 1200
+        when(inputData.getFilterString()).thenReturn("a1c25s10d2008-02-01o2a2c20s4d1200o2l1");
+
+        WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
+        assertEquals(null, builder.buildWhereSQL(DB_PRODUCT));
+    }
+
+    @Test
+    public void testDatePartition() throws Exception {
+        prepareConstruction();
+        when(inputData.hasFilter()).thenReturn(false);
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("cdate:date");
+        when(inputData.getUserProperty("RANGE")).thenReturn("2008-01-01:2009-01-01");
+        when(inputData.getUserProperty("INTERVAL")).thenReturn("2:month");
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+        assertEquals(6, fragments.size());
+
+        //partition-1 : cdate>=2008-01-01 and cdate<2008-03-01
+        when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
+        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL);
+        assertEquals(ORIGINAL_SQL + " WHERE 1=1  AND " +
+                "cdate >= DATE('2008-01-01') AND cdate < DATE('2008-03-01')", fragmentSql);
+    }
+
+    @Test
+    public void testFilterAndPartition() throws Exception {
+        prepareConstruction();
+        when(inputData.hasFilter()).thenReturn(true);
+        when(inputData.getFilterString()).thenReturn("a0c20s1d5o2"); //id>5
+        when(inputData.getUserProperty("PARTITION_BY")).thenReturn("grade:enum");
+        when(inputData.getUserProperty("RANGE")).thenReturn("excellent:good:general:bad");
+
+        WhereSQLBuilder builder = new WhereSQLBuilder(inputData);
+        String whereSql = builder.buildWhereSQL(DB_PRODUCT);
+        assertEquals("1=1 AND id>5", whereSql);
+
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+
+        //partition-1 : id>5 and grade='excellent'
+        when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
+
+        String filterSql = ORIGINAL_SQL + " WHERE " + whereSql;
+        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, filterSql);
+        assertEquals(filterSql + " AND grade='excellent'", fragmentSql);
+    }
+
+    @Test
+    public void testNoPartition() throws Exception {
+        prepareConstruction();
+        when(inputData.hasFilter()).thenReturn(false);
+        JdbcPartitionFragmenter fragment = new JdbcPartitionFragmenter(inputData);
+        List<Fragment> fragments = fragment.getFragments();
+        assertEquals(1, fragments.size());
+
+        when(inputData.getFragmentMetadata()).thenReturn(fragments.get(0).getMetadata());
+
+        String fragmentSql = fragment.buildFragmenterSql(DB_PRODUCT, ORIGINAL_SQL);
+        assertEquals(ORIGINAL_SQL, fragmentSql);
+    }
+
+
+    private void prepareConstruction() throws Exception {
+        inputData = mock(InputData.class);
+        when(inputData.getDataSource()).thenReturn("sales");
+
+
+        ArrayList<ColumnDescriptor> columns = new ArrayList<>();
+        columns.add(new ColumnDescriptor("id", DataType.INTEGER.getOID(), 0, "int4", null));
+        columns.add(new ColumnDescriptor("cdate", DataType.DATE.getOID(), 1, "date", null));
+        columns.add(new ColumnDescriptor("amt", DataType.FLOAT8.getOID(), 2, "float8", null));
+        columns.add(new ColumnDescriptor("grade", DataType.TEXT.getOID(), 3, "text", null));
+        when(inputData.getTupleDescription()).thenReturn(columns);
+        when(inputData.getColumn(0)).thenReturn(columns.get(0));
+        when(inputData.getColumn(1)).thenReturn(columns.get(1));
+        when(inputData.getColumn(2)).thenReturn(columns.get(2));
+        when(inputData.getColumn(3)).thenReturn(columns.get(3));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index d36f54b..53f15bb 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -168,4 +168,13 @@ under the License.
             <resolver>org.apache.hawq.pxf.plugins.json.JsonResolver</resolver>
         </plugins>
     </profile>
+    <profile>
+        <name>Jdbc</name>
+        <description>A profile for reading data into HAWQ via JDBC</description>
+        <plugins>
+            <fragmenter>org.apache.hawq.pxf.plugins.jdbc.JdbcPartitionFragmenter</fragmenter>
+            <accessor>org.apache.hawq.pxf.plugins.jdbc.JdbcReadAccessor</accessor>
+            <resolver>org.apache.hawq.pxf.plugins.jdbc.JdbcReadResolver</resolver>
+        </plugins>
+    </profile>
 </profiles>

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7a22bdd7/pxf/settings.gradle
----------------------------------------------------------------------
diff --git a/pxf/settings.gradle b/pxf/settings.gradle
index c610116..55b6aea 100644
--- a/pxf/settings.gradle
+++ b/pxf/settings.gradle
@@ -25,4 +25,5 @@ include 'pxf-service'
 include 'pxf-hdfs'
 include 'pxf-hive'
 include 'pxf-hbase'
-include 'pxf-json'
\ No newline at end of file
+include 'pxf-json'
+include 'pxf-jdbc'
\ No newline at end of file


Mime
View raw message