hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject [07/15] incubator-hawq git commit: HAWQ-45. PXF package namespace refactor
Date Tue, 03 Nov 2015 00:36:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveResolver.java
deleted file mode 100644
index 85a04d3..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveResolver.java
+++ /dev/null
@@ -1,608 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.*;
-import com.pivotal.pxf.api.io.DataType;
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.api.utilities.Plugin;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-import com.pivotal.pxf.service.utilities.Utilities;
-
-import org.apache.commons.lang.CharUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.*;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static com.pivotal.pxf.api.io.DataType.*;
-
-/**
- * Class HiveResolver handles deserialization of records that were serialized
- * using Hadoop's Hive serialization framework.
- */
-/*
- * TODO - remove SupressWarning once Hive resolves the problem described below
- * This line and the change of the deserialiazer member to Object instead of the
- * original Deserializer...., All this changes stem from the same issue. In
- * 0.11.0 The API changed and all Serde types extend a new interface -
- * AbstractSerde. But this change was not adopted by the OrcSerde (which was
- * also introduced in Hive 0.11.0). In order to cope with this inconsistency...
- * this bit of juggling has been necessary.
- */
-@SuppressWarnings("deprecation")
-public class HiveResolver extends Plugin implements ReadResolver {
-    private static final Log LOG = LogFactory.getLog(HiveResolver.class);
-    private static final String MAPKEY_DELIM = ":";
-    private static final String COLLECTION_DELIM = ",";
-    private SerDe deserializer;
-    private List<OneField> partitionFields;
-    private String serdeName;
-    private String propsString;
-    private String collectionDelim;
-    private String mapkeyDelim;
-    String partitionKeys;
-    char delimiter;
-    String nullChar = "\\N";
-    private Configuration conf;
-    private String hiveDefaultPartName;
-
-    /**
-     * Constructs the HiveResolver by parsing the userdata in the input and
-     * obtaining the serde class name, the serde properties string and the
-     * partition keys.
-     *
-     * @param input contains the Serde class name, the serde properties string
-     *            and the partition keys
-     * @throws Exception if user data was wrong or serde failed to be
-     *             instantiated
-     */
-    public HiveResolver(InputData input) throws Exception {
-        super(input);
-
-        conf = new Configuration();
-        hiveDefaultPartName = HiveConf.getVar(conf,
-                HiveConf.ConfVars.DEFAULTPARTITIONNAME);
-        LOG.debug("Hive's default partition name is " + hiveDefaultPartName);
-
-        parseUserData(input);
-        initPartitionFields();
-        initSerde(input);
-    }
-
-    @Override
-    public List<OneField> getFields(OneRow onerow) throws Exception {
-        Object tuple = deserializer.deserialize((Writable) onerow.getData());
-        // Each Hive record is a Struct
-        StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
-        List<OneField> record = traverseStruct(tuple, soi, false);
-        /*
-         * We follow Hive convention. Partition fields are always added at the
-         * end of the record
-         */
-        record.addAll(partitionFields);
-
-        return record;
-    }
-
-    /* Parses user data string (arrived from fragmenter). */
-    void parseUserData(InputData input) throws Exception {
-        final int EXPECTED_NUM_OF_TOKS = 5;
-
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
-
-        if (toks.length != EXPECTED_NUM_OF_TOKS) {
-            throw new UserDataException("HiveResolver expected "
-                    + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
-        }
-
-        serdeName = toks[1];
-        propsString = toks[2];
-        partitionKeys = toks[3];
-
-        collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
-                : input.getUserProperty("COLLECTION_DELIM");
-        mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
-                : input.getUserProperty("MAPKEY_DELIM");
-    }
-
-    /* Gets and init the deserializer for the records of this Hive data fragment. */
-    void initSerde(InputData inputData) throws Exception {
-        Properties serdeProperties;
-
-        Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader());
-        deserializer = (SerDe) c.newInstance();
-        serdeProperties = new Properties();
-        ByteArrayInputStream inStream = new ByteArrayInputStream(
-                propsString.getBytes());
-        serdeProperties.load(inStream);
-        deserializer.initialize(new JobConf(conf, HiveResolver.class),
-                serdeProperties);
-    }
-
-    /*
-     * The partition fields are initialized one time base on userData provided
-     * by the fragmenter.
-     */
-    void initPartitionFields() {
-        partitionFields = new LinkedList<>();
-        if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
-            return;
-        }
-
-        String[] partitionLevels = partitionKeys.split(HiveDataFragmenter.HIVE_PARTITIONS_DELIM);
-        for (String partLevel : partitionLevels) {
-            String[] levelKey = partLevel.split(HiveDataFragmenter.HIVE_1_PART_DELIM);
-            String type = levelKey[1];
-            String val = levelKey[2];
-            DataType convertedType;
-            Object convertedValue = null;
-            boolean isDefaultPartition = false;
-
-            LOG.debug("Partition type: " + type + ", value: " + val);
-            // check if value is default partition
-            isDefaultPartition = isDefaultPartition(type, val);
-            // ignore the type's parameters
-            String typeName = type.replaceAll("\\(.*\\)", "");
-
-            switch (typeName) {
-                case serdeConstants.STRING_TYPE_NAME:
-                    convertedType = TEXT;
-                    convertedValue = isDefaultPartition ? null : val;
-                    break;
-                case serdeConstants.BOOLEAN_TYPE_NAME:
-                    convertedType = BOOLEAN;
-                    convertedValue = isDefaultPartition ? null
-                            : Boolean.valueOf(val);
-                    break;
-                case serdeConstants.TINYINT_TYPE_NAME:
-                case serdeConstants.SMALLINT_TYPE_NAME:
-                    convertedType = SMALLINT;
-                    convertedValue = isDefaultPartition ? null
-                            : Short.parseShort(val);
-                    break;
-                case serdeConstants.INT_TYPE_NAME:
-                    convertedType = INTEGER;
-                    convertedValue = isDefaultPartition ? null
-                            : Integer.parseInt(val);
-                    break;
-                case serdeConstants.BIGINT_TYPE_NAME:
-                    convertedType = BIGINT;
-                    convertedValue = isDefaultPartition ? null
-                            : Long.parseLong(val);
-                    break;
-                case serdeConstants.FLOAT_TYPE_NAME:
-                    convertedType = REAL;
-                    convertedValue = isDefaultPartition ? null
-                            : Float.parseFloat(val);
-                    break;
-                case serdeConstants.DOUBLE_TYPE_NAME:
-                    convertedType = FLOAT8;
-                    convertedValue = isDefaultPartition ? null
-                            : Double.parseDouble(val);
-                    break;
-                case serdeConstants.TIMESTAMP_TYPE_NAME:
-                    convertedType = TIMESTAMP;
-                    convertedValue = isDefaultPartition ? null
-                            : Timestamp.valueOf(val);
-                    break;
-                case serdeConstants.DATE_TYPE_NAME:
-                    convertedType = DATE;
-                    convertedValue = isDefaultPartition ? null
-                            : Date.valueOf(val);
-                    break;
-                case serdeConstants.DECIMAL_TYPE_NAME:
-                    convertedType = NUMERIC;
-                    convertedValue = isDefaultPartition ? null
-                            : HiveDecimal.create(val).bigDecimalValue().toString();
-                    break;
-                case serdeConstants.VARCHAR_TYPE_NAME:
-                    convertedType = VARCHAR;
-                    convertedValue = isDefaultPartition ? null : val;
-                    break;
-                case serdeConstants.CHAR_TYPE_NAME:
-                    convertedType = BPCHAR;
-                    convertedValue = isDefaultPartition ? null : val;
-                    break;
-                case serdeConstants.BINARY_TYPE_NAME:
-                    convertedType = BYTEA;
-                    convertedValue = isDefaultPartition ? null : val.getBytes();
-                    break;
-                default:
-                    throw new UnsupportedTypeException(
-                            "Unsupported partition type: " + type);
-            }
-            addOneFieldToRecord(partitionFields, convertedType, convertedValue);
-        }
-    }
-
-    /*
-     * The partition fields are initialized one time based on userData provided
-     * by the fragmenter.
-     */
-    int initPartitionFields(StringBuilder parts) {
-        if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
-            return 0;
-        }
-        String[] partitionLevels = partitionKeys.split(HiveDataFragmenter.HIVE_PARTITIONS_DELIM);
-        for (String partLevel : partitionLevels) {
-            String[] levelKey = partLevel.split(HiveDataFragmenter.HIVE_1_PART_DELIM);
-            String type = levelKey[1];
-            String val = levelKey[2];
-            parts.append(delimiter);
-
-            if (isDefaultPartition(type, val)) {
-                parts.append(nullChar);
-            } else {
-                // ignore the type's parameters
-                String typeName = type.replaceAll("\\(.*\\)", "");
-                switch (typeName) {
-                    case serdeConstants.STRING_TYPE_NAME:
-                    case serdeConstants.VARCHAR_TYPE_NAME:
-                    case serdeConstants.CHAR_TYPE_NAME:
-                        parts.append(val);
-                        break;
-                    case serdeConstants.BOOLEAN_TYPE_NAME:
-                        parts.append(Boolean.parseBoolean(val));
-                        break;
-                    case serdeConstants.TINYINT_TYPE_NAME:
-                    case serdeConstants.SMALLINT_TYPE_NAME:
-                        parts.append(Short.parseShort(val));
-                        break;
-                    case serdeConstants.INT_TYPE_NAME:
-                        parts.append(Integer.parseInt(val));
-                        break;
-                    case serdeConstants.BIGINT_TYPE_NAME:
-                        parts.append(Long.parseLong(val));
-                        break;
-                    case serdeConstants.FLOAT_TYPE_NAME:
-                        parts.append(Float.parseFloat(val));
-                        break;
-                    case serdeConstants.DOUBLE_TYPE_NAME:
-                        parts.append(Double.parseDouble(val));
-                        break;
-                    case serdeConstants.TIMESTAMP_TYPE_NAME:
-                        parts.append(Timestamp.valueOf(val));
-                        break;
-                    case serdeConstants.DATE_TYPE_NAME:
-                        parts.append(Date.valueOf(val));
-                        break;
-                    case serdeConstants.DECIMAL_TYPE_NAME:
-                        parts.append(HiveDecimal.create(val).bigDecimalValue());
-                        break;
-                    case serdeConstants.BINARY_TYPE_NAME:
-                        Utilities.byteArrayToOctalString(val.getBytes(), parts);
-                        break;
-                    default:
-                        throw new UnsupportedTypeException(
-                                "Unsupported partition type: " + type);
-                }
-            }
-        }
-        return partitionLevels.length;
-    }
-
-    /**
-     * Returns true if the partition value is Hive's default partition name
-     * (defined in hive.exec.default.partition.name).
-     *
-     * @param partitionType partition field type
-     * @param partitionValue partition value
-     * @return true if the partition value is Hive's default partition
-     */
-    private boolean isDefaultPartition(String partitionType,
-                                       String partitionValue) {
-        boolean isDefaultPartition = false;
-        if (hiveDefaultPartName.equals(partitionValue)) {
-            LOG.debug("partition " + partitionType
-                    + " is hive default partition (value " + partitionValue
-                    + "), converting field to NULL");
-            isDefaultPartition = true;
-        }
-        return isDefaultPartition;
-    }
-
-    /*
-     * If the object representing the whole record is null or if an object
-     * representing a composite sub-object (map, list,..) is null - then
-     * BadRecordException will be thrown. If a primitive field value is null,
-     * then a null will appear for the field in the record in the query result.
-     */
-    private void traverseTuple(Object obj, ObjectInspector objInspector,
-                               List<OneField> record, boolean toFlatten)
-            throws IOException, BadRecordException {
-        ObjectInspector.Category category = objInspector.getCategory();
-        if ((obj == null) && (category != ObjectInspector.Category.PRIMITIVE)) {
-            throw new BadRecordException("NULL Hive composite object");
-        }
-        switch (category) {
-            case PRIMITIVE:
-                resolvePrimitive(obj, (PrimitiveObjectInspector) objInspector,
-                        record, toFlatten);
-                break;
-            case LIST:
-                List<OneField> listRecord = traverseList(obj,
-                        (ListObjectInspector) objInspector);
-                addOneFieldToRecord(record, TEXT, String.format("[%s]",
-                        HdfsUtilities.toString(listRecord, collectionDelim)));
-                break;
-            case MAP:
-                List<OneField> mapRecord = traverseMap(obj,
-                        (MapObjectInspector) objInspector);
-                addOneFieldToRecord(record, TEXT, String.format("{%s}",
-                        HdfsUtilities.toString(mapRecord, collectionDelim)));
-                break;
-            case STRUCT:
-                List<OneField> structRecord = traverseStruct(obj,
-                        (StructObjectInspector) objInspector, true);
-                addOneFieldToRecord(record, TEXT, String.format("{%s}",
-                        HdfsUtilities.toString(structRecord, collectionDelim)));
-                break;
-            case UNION:
-                List<OneField> unionRecord = traverseUnion(obj,
-                        (UnionObjectInspector) objInspector);
-                addOneFieldToRecord(record, TEXT, String.format("[%s]",
-                        HdfsUtilities.toString(unionRecord, collectionDelim)));
-                break;
-            default:
-                throw new UnsupportedTypeException("Unknown category type: "
-                        + objInspector.getCategory());
-        }
-    }
-
-    private List<OneField> traverseUnion(Object obj, UnionObjectInspector uoi)
-            throws BadRecordException, IOException {
-        List<OneField> unionRecord = new LinkedList<>();
-        List<? extends ObjectInspector> ois = uoi.getObjectInspectors();
-        if (ois == null) {
-            throw new BadRecordException(
-                    "Illegal value NULL for Hive data type Union");
-        }
-        traverseTuple(uoi.getField(obj), ois.get(uoi.getTag(obj)), unionRecord,
-                true);
-        return unionRecord;
-    }
-
-    private List<OneField> traverseList(Object obj, ListObjectInspector loi)
-            throws BadRecordException, IOException {
-        List<OneField> listRecord = new LinkedList<>();
-        List<?> list = loi.getList(obj);
-        ObjectInspector eoi = loi.getListElementObjectInspector();
-        if (list == null) {
-            throw new BadRecordException(
-                    "Illegal value NULL for Hive data type List");
-        }
-        for (Object object : list) {
-            traverseTuple(object, eoi, listRecord, true);
-        }
-        return listRecord;
-    }
-
-    private List<OneField> traverseStruct(Object struct,
-                                          StructObjectInspector soi,
-                                          boolean toFlatten)
-            throws BadRecordException, IOException {
-        List<? extends StructField> fields = soi.getAllStructFieldRefs();
-        List<Object> structFields = soi.getStructFieldsDataAsList(struct);
-        if (structFields == null) {
-            throw new BadRecordException(
-                    "Illegal value NULL for Hive data type Struct");
-        }
-        List<OneField> structRecord = new LinkedList<>();
-        List<OneField> complexRecord = new LinkedList<>();
-        for (int i = 0; i < structFields.size(); i++) {
-            if (toFlatten) {
-                complexRecord.add(new OneField(TEXT.getOID(), String.format(
-                        "\"%s\"", fields.get(i).getFieldName())));
-            }
-            traverseTuple(structFields.get(i),
-                    fields.get(i).getFieldObjectInspector(), complexRecord,
-                    toFlatten);
-            if (toFlatten) {
-                addOneFieldToRecord(structRecord, TEXT,
-                        HdfsUtilities.toString(complexRecord, mapkeyDelim));
-                complexRecord.clear();
-            }
-        }
-        return toFlatten ? structRecord : complexRecord;
-    }
-
-    private List<OneField> traverseMap(Object obj, MapObjectInspector moi)
-            throws BadRecordException, IOException {
-        List<OneField> complexRecord = new LinkedList<>();
-        List<OneField> mapRecord = new LinkedList<>();
-        ObjectInspector koi = moi.getMapKeyObjectInspector();
-        ObjectInspector voi = moi.getMapValueObjectInspector();
-        Map<?, ?> map = moi.getMap(obj);
-        if (map == null) {
-            throw new BadRecordException(
-                    "Illegal value NULL for Hive data type Map");
-        } else if (map.isEmpty()) {
-            traverseTuple(null, koi, complexRecord, true);
-            traverseTuple(null, voi, complexRecord, true);
-            addOneFieldToRecord(mapRecord, TEXT,
-                    HdfsUtilities.toString(complexRecord, mapkeyDelim));
-        } else {
-            for (Map.Entry<?, ?> entry : map.entrySet()) {
-                traverseTuple(entry.getKey(), koi, complexRecord, true);
-                traverseTuple(entry.getValue(), voi, complexRecord, true);
-                addOneFieldToRecord(mapRecord, TEXT,
-                        HdfsUtilities.toString(complexRecord, mapkeyDelim));
-                complexRecord.clear();
-            }
-        }
-        return mapRecord;
-    }
-
-    private void resolvePrimitive(Object o, PrimitiveObjectInspector oi,
-                                  List<OneField> record, boolean toFlatten)
-            throws IOException {
-        Object val;
-        switch (oi.getPrimitiveCategory()) {
-            case BOOLEAN: {
-                val = (o != null) ? ((BooleanObjectInspector) oi).get(o) : null;
-                addOneFieldToRecord(record, BOOLEAN, val);
-                break;
-            }
-            case SHORT: {
-                val = (o != null) ? ((ShortObjectInspector) oi).get(o) : null;
-                addOneFieldToRecord(record, SMALLINT, val);
-                break;
-            }
-            case INT: {
-                val = (o != null) ? ((IntObjectInspector) oi).get(o) : null;
-                addOneFieldToRecord(record, INTEGER, val);
-                break;
-            }
-            case LONG: {
-                val = (o != null) ? ((LongObjectInspector) oi).get(o) : null;
-                addOneFieldToRecord(record, BIGINT, val);
-                break;
-            }
-            case FLOAT: {
-                val = (o != null) ? ((FloatObjectInspector) oi).get(o) : null;
-                addOneFieldToRecord(record, REAL, val);
-                break;
-            }
-            case DOUBLE: {
-                val = (o != null) ? ((DoubleObjectInspector) oi).get(o) : null;
-                addOneFieldToRecord(record, FLOAT8, val);
-                break;
-            }
-            case DECIMAL: {
-                String sVal = null;
-                if (o != null) {
-                    HiveDecimal hd = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
-                    if (hd != null) {
-                        BigDecimal bd = hd.bigDecimalValue();
-                        sVal = bd.toString();
-                    }
-                }
-                addOneFieldToRecord(record, NUMERIC, sVal);
-                break;
-            }
-            case STRING: {
-                val = (o != null) ? ((StringObjectInspector) oi).getPrimitiveJavaObject(o)
-                        : null;
-                addOneFieldToRecord(record, TEXT,
-                        toFlatten ? String.format("\"%s\"", val) : val);
-                break;
-            }
-            case VARCHAR:
-                val = (o != null) ? ((HiveVarcharObjectInspector) oi).getPrimitiveJavaObject(o)
-                        : null;
-                addOneFieldToRecord(record, VARCHAR,
-                        toFlatten ? String.format("\"%s\"", val) : val);
-                break;
-            case CHAR:
-                val = (o != null) ? ((HiveCharObjectInspector) oi).getPrimitiveJavaObject(o)
-                        : null;
-                addOneFieldToRecord(record, BPCHAR,
-                        toFlatten ? String.format("\"%s\"", val) : val);
-                break;
-            case BINARY: {
-                byte[] toEncode = null;
-                if (o != null) {
-                    BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(o);
-                    toEncode = new byte[bw.getLength()];
-                    System.arraycopy(bw.getBytes(), 0, toEncode, 0,
-                            bw.getLength());
-                }
-                addOneFieldToRecord(record, BYTEA, toEncode);
-                break;
-            }
-            case TIMESTAMP: {
-                val = (o != null) ? ((TimestampObjectInspector) oi).getPrimitiveJavaObject(o)
-                        : null;
-                addOneFieldToRecord(record, TIMESTAMP, val);
-                break;
-            }
-            case DATE:
-                val = (o != null) ? ((DateObjectInspector) oi).getPrimitiveJavaObject(o)
-                        : null;
-                addOneFieldToRecord(record, DATE, val);
-                break;
-            case BYTE: { /* TINYINT */
-                val = (o != null) ? new Short(((ByteObjectInspector) oi).get(o))
-                        : null;
-                addOneFieldToRecord(record, SMALLINT, val);
-                break;
-            }
-            default: {
-                throw new UnsupportedTypeException(oi.getTypeName()
-                        + " conversion is not supported by "
-                        + getClass().getSimpleName());
-            }
-        }
-    }
-
-    private void addOneFieldToRecord(List<OneField> record,
-                                     DataType gpdbWritableType, Object val) {
-        record.add(new OneField(gpdbWritableType.getOID(), val));
-    }
-
-    /*
-     * Gets the delimiter character from the URL, verify and store it. Must be a
-     * single ascii character (same restriction as Hawq's). If a hex
-     * representation was passed, convert it to its char.
-     */
-    void parseDelimiterChar(InputData input) {
-
-        String userDelim = input.getUserProperty("DELIMITER");
-
-        final int VALID_LENGTH = 1;
-        final int VALID_LENGTH_HEX = 4;
-
-        if (userDelim.startsWith("\\x")) { // hexadecimal sequence
-
-            if (userDelim.length() != VALID_LENGTH_HEX) {
-                throw new IllegalArgumentException(
-                        "Invalid hexdecimal value for delimiter (got"
-                                + userDelim + ")");
-            }
-
-            delimiter = (char) Integer.parseInt(
-                    userDelim.substring(2, VALID_LENGTH_HEX), 16);
-
-            if (!CharUtils.isAscii(delimiter)) {
-                throw new IllegalArgumentException(
-                        "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
-                                + delimiter + ")");
-            }
-
-            return;
-        }
-
-        if (userDelim.length() != VALID_LENGTH) {
-            throw new IllegalArgumentException(
-                    "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got "
-                            + userDelim + ")");
-        }
-
-        if (!CharUtils.isAscii(userDelim.charAt(0))) {
-            throw new IllegalArgumentException(
-                    "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal sequence (got non ASCII "
-                            + userDelim + ")");
-        }
-
-        delimiter = userDelim.charAt(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveStringPassResolver.java
deleted file mode 100644
index fe0502e..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveStringPassResolver.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.pivotal.pxf.plugins.hive;
-
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.utilities.InputData;
-
-import java.util.Collections;
-import java.util.List;
-
-import static com.pivotal.pxf.api.io.DataType.VARCHAR;
-
-/**
- * Specialized HiveResolver for a Hive table stored as Text files.
- * Use together with HiveInputFormatFragmenter/HiveLineBreakAccessor.
- */
-public class HiveStringPassResolver extends HiveResolver {
-    private StringBuilder parts;
-
-    public HiveStringPassResolver(InputData input) throws Exception {
-        super(input);
-    }
-
-    @Override
-    void parseUserData(InputData input) throws Exception {
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
-        parseDelimiterChar(input);
-        parts = new StringBuilder();
-        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
-    }
-
-    @Override
-    void initSerde(InputData input) {
-        /* nothing to do here */
-    }
-
-    @Override
-    void initPartitionFields() {
-        initPartitionFields(parts);
-    }
-
-    /**
-     * getFields returns a singleton list of OneField item.
-     * OneField item contains two fields: an integer representing the VARCHAR type and a Java
-     * Object representing the field value.
-     */
-    @Override
-    public List<OneField> getFields(OneRow onerow) throws Exception {
-        String line = (onerow.getData()).toString();
-
-        /* We follow Hive convention. Partition fields are always added at the end of the record */
-        return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilities.java
deleted file mode 100644
index 700a88f..0000000
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/utilities/HiveUtilities.java
+++ /dev/null
@@ -1,219 +0,0 @@
-package com.pivotal.pxf.plugins.hive.utilities;
-
-import java.util.ArrayList;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import com.pivotal.pxf.api.Metadata;
-import com.pivotal.pxf.api.UnsupportedTypeException;
-
-/**
- * Class containing helper functions connecting
- * and interacting with Hive.
- */
-public class HiveUtilities {
-
-    private static final Log LOG = LogFactory.getLog(HiveUtilities.class);
-
-    /**
-     * Default Hive DB (schema) name.
-     */
-    private static final String HIVE_DEFAULT_DBNAME = "default";
-
-    /**
-     * Initializes the HiveMetaStoreClient
-     * Uses classpath configuration files to locate the MetaStore
-     *
-     * @return initialized client
-     */
-    public static HiveMetaStoreClient initHiveClient() {
-        HiveMetaStoreClient client = null;
-        try {
-            client = new HiveMetaStoreClient(new HiveConf());
-        } catch (MetaException cause) {
-            throw new RuntimeException("Failed connecting to Hive MetaStore service: " + cause.getMessage(), cause);
-        }
-        return client;
-    }
-
-    public static Table getHiveTable(HiveMetaStoreClient client, Metadata.Table tableName)
-            throws Exception {
-        Table tbl = client.getTable(tableName.getDbName(), tableName.getTableName());
-        String tblType = tbl.getTableType();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Table: " + tableName.getDbName() + "." + tableName.getTableName() + ", type: " + tblType);
-        }
-
-        if (TableType.valueOf(tblType) == TableType.VIRTUAL_VIEW) {
-            throw new UnsupportedOperationException("Hive views are not supported by HAWQ");
-        }
-
-        return tbl;
-    }
-
-    /**
-     * Checks if hive type is supported, and if so
-     * return its matching HAWQ type.
-     * Unsupported types will result in an exception.
-     * <br>
-     * The supported mappings are:<ul>
-     * <li>{@code tinyint -> int2}</li>
-     * <li>{@code smallint -> int2}</li>
-     * <li>{@code int -> int4}</li>
-     * <li>{@code bigint -> int8}</li>
-     * <li>{@code boolean -> bool}</li>
-     * <li>{@code float -> float4}</li>
-     * <li>{@code double -> float8}</li>
-     * <li>{@code string -> text}</li>
-     * <li>{@code binary -> bytea}</li>
-     * <li>{@code timestamp -> timestamp}</li>
-     * <li>{@code date -> date}</li>
-     * <li>{@code decimal(precision, scale) -> numeric(precision, scale)}</li>
-     * <li>{@code varchar(size) -> varchar(size)}</li>
-     * <li>{@code char(size) -> bpchar(size)}</li>
-     * </ul>
-     *
-     * @param hiveColumn hive column schema
-     * @return field with mapped HAWQ type and modifiers
-     * @throws UnsupportedTypeException if the column type is not supported
-     */
-    public static Metadata.Field mapHiveType(FieldSchema hiveColumn) throws UnsupportedTypeException {
-        String fieldName = hiveColumn.getName();
-        String hiveType = hiveColumn.getType();
-        String mappedType;
-        String[] modifiers = null;
-
-        // check parameterized types:
-        if (hiveType.startsWith("varchar(") ||
-                hiveType.startsWith("char(")) {
-            String[] toks = hiveType.split("[(,)]");
-            if (toks.length != 2) {
-                throw new UnsupportedTypeException( "HAWQ does not support type " + hiveType + " (Field " + fieldName + "), " +
-                        "expected type of the form <type name>(<parameter>)");
-            }
-            mappedType = toks[0];
-            if (mappedType.equals("char")) {
-                mappedType = "bpchar";
-            }
-            modifiers = new String[] {toks[1]};
-        } else if (hiveType.startsWith("decimal(")) {
-            String[] toks = hiveType.split("[(,)]");
-            if (toks.length != 3) {
-                throw new UnsupportedTypeException( "HAWQ does not support type " + hiveType + " (Field " + fieldName + "), " +
-                        "expected type of the form <type name>(<parameter>,<parameter>)");
-            }
-            mappedType = "numeric";
-            modifiers = new String[] {toks[1], toks[2]};
-        } else {
-
-            switch (hiveType) {
-            case "tinyint":
-            case "smallint":
-            	mappedType = "int2";
-            	break;
-            case "int":
-            	mappedType = "int4";
-            	break;
-            case "bigint":
-            	mappedType = "int8";
-            	break;
-            case "boolean":
-            	mappedType = "bool";
-            	break;
-            case "timestamp":
-            case "date":
-                mappedType = hiveType;
-                break;
-            case "float":
-                mappedType = "float4";
-                break;
-            case "double":
-                mappedType = "float8";
-                break;
-            case "string":
-                mappedType = "text";
-                break;
-            case "binary":
-                mappedType = "bytea";
-                break;
-            default:
-                throw new UnsupportedTypeException(
-                        "HAWQ does not support type " + hiveType + " (Field " + fieldName + ")");
-            }
-        }
-        if (!verifyModifers(modifiers)) {
-            throw new UnsupportedTypeException("HAWQ does not support type " + hiveType + " (Field " + fieldName + "), modifiers should be integers");
-        }
-        return new Metadata.Field(fieldName, mappedType, modifiers);
-    }
-
-    /**
-     * Verifies modifiers are null or integers.
-     * Modifier is a value assigned to a type,
-     * e.g. size of a varchar - varchar(size).
-     *
-     * @param modifiers type modifiers to be verified
-     * @return whether modifiers are null or integers
-     */
-    private static boolean verifyModifers(String[] modifiers) {
-        if (modifiers == null) {
-            return true;
-        }
-        for (String modifier: modifiers) {
-            if (StringUtils.isBlank(modifier) || !StringUtils.isNumeric(modifier)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    /**
-     * Extracts the db_name and table_name from the qualifiedName.
-     * qualifiedName is the Hive table name that the user enters in the CREATE EXTERNAL TABLE statement
-     * or when querying HCatalog table.
-     * It can be either <code>table_name</code> or <code>db_name.table_name</code>.
-     *
-     * @param qualifiedName Hive table name
-     * @return {@link com.pivotal.pxf.api.Metadata.Table} object holding the full table name
-     */
-    public static Metadata.Table parseTableQualifiedName(String qualifiedName) {
-
-        String dbName, tableName;
-        String errorMsg = " is not a valid Hive table name. "
-                + "Should be either <table_name> or <db_name.table_name>";
-
-        if (StringUtils.isBlank(qualifiedName)) {
-            throw new IllegalArgumentException("empty string" + errorMsg);
-        }
-
-        String[] rawToks = qualifiedName.split("[.]");
-        ArrayList<String> toks = new ArrayList<String>();
-        for (String tok: rawToks) {
-            if (StringUtils.isBlank(tok)) {
-                continue;
-            }
-            toks.add(tok.trim());
-        }
-
-        if (toks.size() == 1) {
-            dbName = HIVE_DEFAULT_DBNAME;
-            tableName = toks.get(0);
-        } else if (toks.size() == 2) {
-            dbName = toks.get(0);
-            tableName = toks.get(1);
-        } else {
-            throw new IllegalArgumentException("\"" + qualifiedName + "\"" + errorMsg);
-        }
-
-        return new Metadata.Table(dbName, tableName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
new file mode 100644
index 0000000..33bd851
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
@@ -0,0 +1,244 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Accessor for Hive tables. The accessor will open and read a split belonging
+ * to a Hive table. Opening a split means creating the corresponding InputFormat
+ * and RecordReader required to access the split's data. The actual record
+ * reading is done in the base class -
+ * {@link org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor}. <br>
+ * HiveAccessor will also enforce Hive partition filtering by filtering-out a
+ * split which does not belong to a partition filter. Naturally, the partition
+ * filtering will be done only for Hive tables that are partitioned.
+ */
+public class HiveAccessor extends HdfsSplittableDataAccessor {
+    private static final Log LOG = LogFactory.getLog(HiveAccessor.class);
+    List<HivePartition> partitions;
+
+    class HivePartition {
+        public String name;
+        public String type;
+        public String val;
+
+        HivePartition(String name, String type, String val) {
+            this.name = name;
+            this.type = type;
+            this.val = val;
+        }
+    }
+
+    protected Boolean filterInFragmenter = false;
+
+    /**
+     * Constructs a HiveAccessor and creates an InputFormat (derived from
+     * {@link org.apache.hadoop.mapred.InputFormat}) and the Hive partition
+     * fields
+     *
+     * @param input contains the InputFormat class name and the partition fields
+     * @throws Exception if failed to create input format
+     */
+    public HiveAccessor(InputData input) throws Exception {
+        /*
+         * Unfortunately, Java does not allow us to call a function before
+         * calling the base constructor, otherwise it would have been:
+         * super(input, createInputFormat(input))
+         */
+        super(input, null);
+        inputFormat = createInputFormat(input);
+    }
+
+    /**
+     * Constructs a HiveAccessor
+     *
+     * @param input contains the InputFormat class name and the partition fields
+     * @param inputFormat Hive InputFormat
+     */
+    public HiveAccessor(InputData input, InputFormat<?, ?> inputFormat) {
+        super(input, inputFormat);
+    }
+
+    /**
+     * Opens Hive data split for read. Enables Hive partition filtering. <br>
+     *
+     * @return true if there are no partitions or there is no partition filter
+     *         or partition filter is set and the file currently opened by the
+     *         accessor belongs to the partition.
+     * @throws Exception if filter could not be built, connection to Hive failed
+     *             or resource failed to open
+     */
+    @Override
+    public boolean openForRead() throws Exception {
+        return isOurDataInsideFilteredPartition() && super.openForRead();
+    }
+
+    /**
+     * Creates the RecordReader suitable for this given split.
+     *
+     * @param jobConf configuration data for the Hadoop framework
+     * @param split the split that was allocated for reading to this accessor
+     * @return record reader
+     * @throws IOException if failed to create record reader
+     */
+    @Override
+    protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException {
+        return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+    }
+
+    /*
+     * Parses the user-data supplied by the HiveFragmenter from InputData. Based
+     * on the user-data construct the partition fields and the InputFormat for
+     * current split
+     */
+    private InputFormat<?, ?> createInputFormat(InputData input)
+            throws Exception {
+        String userData = new String(input.getFragmentUserData());
+        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+        initPartitionFields(toks[3]);
+        filterInFragmenter = new Boolean(toks[4]);
+        return HiveDataFragmenter.makeInputFormat(
+                toks[0]/* inputFormat name */, jobConf);
+    }
+
+    /*
+     * The partition fields are initialized one time base on userData provided
+     * by the fragmenter
+     */
+    void initPartitionFields(String partitionKeys) {
+        partitions = new LinkedList<HivePartition>();
+        if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
+            return;
+        }
+
+        String[] partitionLevels = partitionKeys.split(HiveDataFragmenter.HIVE_PARTITIONS_DELIM);
+        for (String partLevel : partitionLevels) {
+            String[] levelKey = partLevel.split(HiveDataFragmenter.HIVE_1_PART_DELIM);
+            String name = levelKey[0];
+            String type = levelKey[1];
+            String val = levelKey[2];
+            partitions.add(new HivePartition(name, type, val));
+        }
+    }
+
+    private boolean isOurDataInsideFilteredPartition() throws Exception {
+        if (!inputData.hasFilter()) {
+            return true;
+        }
+
+        if (filterInFragmenter) {
+            LOG.debug("filtering was done in fragmenter");
+            return true;
+        }
+
+        String filterStr = inputData.getFilterString();
+        HiveFilterBuilder eval = new HiveFilterBuilder(inputData);
+        Object filter = eval.getFilterObject(filterStr);
+
+        boolean returnData = isFiltered(partitions, filter);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("segmentId: " + inputData.getSegmentId() + " "
+                    + inputData.getDataSource() + "--" + filterStr
+                    + " returnData: " + returnData);
+            if (filter instanceof List) {
+                for (Object f : (List<?>) filter) {
+                    printOneBasicFilter(f);
+                }
+            } else {
+                printOneBasicFilter(filter);
+            }
+        }
+
+        return returnData;
+    }
+
+    private boolean isFiltered(List<HivePartition> partitionFields,
+                               Object filter) {
+        if (filter instanceof List) {
+            /*
+             * We are going over each filter in the filters list and test it
+             * against all the partition fields since filters are connected only
+             * by AND operators, its enough for one filter to fail in order to
+             * deny this data.
+             */
+            for (Object f : (List<?>) filter) {
+                if (!testOneFilter(partitionFields, f, inputData)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        return testOneFilter(partitionFields, filter, inputData);
+    }
+
+    /*
+     * We are testing one filter against all the partition fields. The filter
+     * has the form "fieldA = valueA". The partitions have the form
+     * partitionOne=valueOne/partitionTwo=ValueTwo/partitionThree=valueThree 1.
+     * For a filter to match one of the partitions, lets say partitionA for
+     * example, we need: fieldA = partittionOne and valueA = valueOne. If this
+     * condition occurs, we return true. 2. If fieldA does not match any one of
+     * the partition fields we also return true, it means we ignore this filter
+     * because it is not on a partition field. 3. If fieldA = partittionOne and
+     * valueA != valueOne, then we return false.
+     */
+    private boolean testOneFilter(List<HivePartition> partitionFields,
+                                  Object filter, InputData input) {
+        // Let's look first at the filter
+        FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
+
+        boolean isFilterOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
+        if (!isFilterOperationEqual) /*
+                                      * in case this is not an "equality filter"
+                                      * we ignore it here - in partition
+                                      * filtering
+                                      */{
+            return true;
+        }
+
+        int filterColumnIndex = bFilter.getColumn().index();
+        String filterValue = bFilter.getConstant().constant().toString();
+        ColumnDescriptor filterColumn = input.getColumn(filterColumnIndex);
+        String filterColumnName = filterColumn.columnName();
+
+        for (HivePartition partition : partitionFields) {
+            if (filterColumnName.equals(partition.name)) {
+                /*
+                 * the filter field matches a partition field, but the values do
+                 * not match
+                 */
+                return filterValue.equals(partition.val);
+            }
+        }
+
+        /*
+         * filter field did not match any partition field, so we ignore this
+         * filter and hence return true
+         */
+        return true;
+    }
+
+    private void printOneBasicFilter(Object filter) {
+        FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
+        boolean isOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
+        int columnIndex = bFilter.getColumn().index();
+        String value = bFilter.getConstant().constant().toString();
+        LOG.debug("isOperationEqual: " + isOperationEqual + " columnIndex: "
+                + columnIndex + " value: " + value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
new file mode 100644
index 0000000..29549c6
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -0,0 +1,213 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.utilities.Utilities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDeBase;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
+
+/**
+ * Specialized HiveResolver for a Hive table stored as RC file.
+ * Use together with HiveInputFormatFragmenter/HiveRCFileAccessor.
+ */
+public class HiveColumnarSerdeResolver extends HiveResolver {
+    private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class);
+    private ColumnarSerDeBase deserializer;
+    private boolean firstColumn;
+    private StringBuilder builder;
+    private StringBuilder parts;
+    private int numberOfPartitions;
+    private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+
+    public HiveColumnarSerdeResolver(InputData input) throws Exception {
+        super(input);
+    }
+
+    /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
+    @Override
+    void parseUserData(InputData input) throws Exception {
+        String[] toks = HiveInputFormatFragmenter.parseToks(input);
+        String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
+        if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) {
+            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
+        } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) {
+            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
+        } else {
+            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
+        }
+        parts = new StringBuilder();
+        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+        parseDelimiterChar(input);
+    }
+
+    @Override
+    void initPartitionFields() {
+        numberOfPartitions = initPartitionFields(parts);
+    }
+
+    /**
+     * getFields returns a singleton list of OneField item.
+     * OneField item contains two fields: an integer representing the VARCHAR type and a Java
+     * Object representing the field value.
+     */
+    @Override
+    public List<OneField> getFields(OneRow onerow) throws Exception {
+        firstColumn = true;
+        builder = new StringBuilder();
+        Object tuple = deserializer.deserialize((Writable) onerow.getData());
+        ObjectInspector oi = deserializer.getObjectInspector();
+
+        traverseTuple(tuple, oi);
+        /* We follow Hive convention. Partition fields are always added at the end of the record */
+        builder.append(parts);
+        return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+    }
+
+    /*
+     * Get and init the deserializer for the records of this Hive data fragment.
+     * Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated
+     * but its implementations (ColumnarSerDe, LazyBinaryColumnarSerDe) still use the deprecated interface.
+     */
+    @SuppressWarnings("deprecation")
+	@Override
+    void initSerde(InputData input) throws Exception {
+        Properties serdeProperties = new Properties();
+        int numberOfDataColumns = input.getColumns() - numberOfPartitions;
+
+        LOG.debug("Serde number of columns is " + numberOfDataColumns);
+
+        StringBuilder columnNames = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
+        StringBuilder columnTypes = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
+        String delim = "";
+        for (int i = 0; i < numberOfDataColumns; i++) {
+            ColumnDescriptor column = input.getColumn(i);
+            String columnName = column.columnName();
+            String columnType = HiveInputFormatFragmenter.toHiveType(DataType.get(column.columnTypeCode()), columnName);
+            columnNames.append(delim).append(columnName);
+            columnTypes.append(delim).append(columnType);
+            delim = ",";
+        }
+        serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
+        serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
+
+        if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
+            deserializer = new ColumnarSerDe();
+        } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
+            deserializer = new LazyBinaryColumnarSerDe();
+        } else {
+            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
+        }
+
+        deserializer.initialize(new JobConf(new Configuration(), HiveColumnarSerdeResolver.class), serdeProperties);
+    }
+
+    /**
+     * Handle a Hive record.
+     * Supported object categories:
+     * Primitive - including NULL
+     * Struct (used by ColumnarSerDe to store primitives) - cannot be NULL
+     * <p/>
+     * Any other category will throw UnsupportedTypeException
+     */
+    private void traverseTuple(Object obj, ObjectInspector objInspector) throws IOException, BadRecordException {
+        ObjectInspector.Category category = objInspector.getCategory();
+        if ((obj == null) && (category != ObjectInspector.Category.PRIMITIVE)) {
+            throw new BadRecordException("NULL Hive composite object");
+        }
+        switch (category) {
+            case PRIMITIVE:
+                resolvePrimitive(obj, (PrimitiveObjectInspector) objInspector);
+                break;
+            case STRUCT:
+                StructObjectInspector soi = (StructObjectInspector) objInspector;
+                List<? extends StructField> fields = soi.getAllStructFieldRefs();
+                List<?> list = soi.getStructFieldsDataAsList(obj);
+                if (list == null) {
+                    throw new BadRecordException("Illegal value NULL for Hive data type Struct");
+                }
+                for (int i = 0; i < list.size(); i++) {
+                    traverseTuple(list.get(i), fields.get(i).getFieldObjectInspector());
+                }
+                break;
+            default:
+                throw new UnsupportedTypeException("Hive object category: " + objInspector.getCategory() + " unsupported");
+        }
+    }
+
+    private void resolvePrimitive(Object o, PrimitiveObjectInspector oi) throws IOException {
+
+        if (!firstColumn) {
+            builder.append(delimiter);
+        }
+
+        if (o == null) {
+            builder.append(nullChar);
+        } else {
+            switch (oi.getPrimitiveCategory()) {
+                case BOOLEAN:
+                    builder.append(((BooleanObjectInspector) oi).get(o));
+                    break;
+                case SHORT:
+                    builder.append(((ShortObjectInspector) oi).get(o));
+                    break;
+                case INT:
+                    builder.append(((IntObjectInspector) oi).get(o));
+                    break;
+                case LONG:
+                    builder.append(((LongObjectInspector) oi).get(o));
+                    break;
+                case FLOAT:
+                    builder.append(((FloatObjectInspector) oi).get(o));
+                    break;
+                case DOUBLE:
+                    builder.append(((DoubleObjectInspector) oi).get(o));
+                    break;
+                case DECIMAL:
+                    builder.append(((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o).bigDecimalValue());
+                    break;
+                case STRING:
+                    builder.append(((StringObjectInspector) oi).getPrimitiveJavaObject(o));
+                    break;
+                case BINARY:
+                    byte[] bytes = ((BinaryObjectInspector) oi).getPrimitiveJavaObject(o);
+                    Utilities.byteArrayToOctalString(bytes, builder);
+                    break;
+                case TIMESTAMP:
+                    builder.append(((TimestampObjectInspector) oi).getPrimitiveJavaObject(o));
+                    break;
+                case BYTE:  /* TINYINT */
+                    builder.append(new Short(((ByteObjectInspector) oi).get(o)));
+                    break;
+                default:
+                    throw new UnsupportedTypeException(oi.getTypeName()
+                            + " conversion is not supported by HiveColumnarSerdeResolver");
+            }
+        }
+        firstColumn = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
new file mode 100644
index 0000000..881aeac
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -0,0 +1,446 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import java.io.ByteArrayOutputStream;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+
+/**
+ * Fragmenter class for HIVE tables.
+ * <br>
+ * Given a Hive table and its partitions divide the data into fragments (here a
+ * data fragment is actually a HDFS file block) and return a list of them. Each
+ * data fragment will contain the following information:
+ * <ol>
+ * <li>sourceName: full HDFS path to the data file that this data fragment is
+ * part of</li>
+ * <li>hosts: a list of the datanode machines that hold a replica of this block</li>
+ * <li>userData:
+ * file_input_format_name_DELIM_serde_name_DELIM_serialization_properties</li>
+ * </ol>
+ */
+public class HiveDataFragmenter extends Fragmenter {
+    private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class);
+    private static final short ALL_PARTS = -1;
+
+    static final String HIVE_UD_DELIM = "!HUDD!";
+    static final String HIVE_1_PART_DELIM = "!H1PD!";
+    static final String HIVE_PARTITIONS_DELIM = "!HPAD!";
+    static final String HIVE_NO_PART_TBL = "!HNPT!";
+
+    static final String HIVE_API_EQ = " = ";
+    static final String HIVE_API_DQUOTE = "\"";
+
+    private JobConf jobConf;
+    private HiveMetaStoreClient client;
+
+    protected boolean filterInFragmenter = false;
+
+    // Data structure to hold hive partition names if exist, to be used by
+    // partition filtering
+    private Set<String> setPartitions = new TreeSet<String>(
+            String.CASE_INSENSITIVE_ORDER);
+
+    /**
+     * A Hive table unit - means a subset of the HIVE table, where we can say
+     * that for all files in this subset, they all have the same InputFormat and
+     * Serde. For a partitioned table the HiveTableUnit will be one partition
+     * and for an unpartitioned table, the HiveTableUnit will be the whole table
+     */
+    class HiveTablePartition {
+        StorageDescriptor storageDesc;
+        Properties properties;
+        Partition partition;
+        List<FieldSchema> partitionKeys;
+        String tableName;
+
+        HiveTablePartition(StorageDescriptor storageDesc,
+                           Properties properties, Partition partition,
+                           List<FieldSchema> partitionKeys, String tableName) {
+            this.storageDesc = storageDesc;
+            this.properties = properties;
+            this.partition = partition;
+            this.partitionKeys = partitionKeys;
+            this.tableName = tableName;
+        }
+
+        @Override
+        public String toString() {
+            return "table - " + tableName
+                    + ((partition == null) ? "" : ", partition - " + partition);
+        }
+    }
+
+    /**
+     * Constructs a HiveDataFragmenter object.
+     *
+     * @param inputData all input parameters coming from the client
+     */
+    public HiveDataFragmenter(InputData inputData) {
+        this(inputData, HiveDataFragmenter.class);
+    }
+
+    /**
+     * Constructs a HiveDataFragmenter object.
+     *
+     * @param inputData all input parameters coming from the client
+     * @param clazz Class for JobConf
+     */
+    public HiveDataFragmenter(InputData inputData, Class<?> clazz) {
+        super(inputData);
+        jobConf = new JobConf(new Configuration(), clazz);
+        client = HiveUtilities.initHiveClient();
+    }
+
+    @Override
+    public List<Fragment> getFragments() throws Exception {
+        Metadata.Table tblDesc = HiveUtilities.parseTableQualifiedName(inputData.getDataSource());
+
+        fetchTableMetaData(tblDesc);
+
+        return fragments;
+    }
+
+    /**
+     * Creates the partition InputFormat.
+     *
+     * @param inputFormatName input format class name
+     * @param jobConf configuration data for the Hadoop framework
+     * @return a {@link org.apache.hadoop.mapred.InputFormat} derived object
+     * @throws Exception if failed to create input format
+     */
+    public static InputFormat<?, ?> makeInputFormat(String inputFormatName,
+                                                    JobConf jobConf)
+            throws Exception {
+        Class<?> c = Class.forName(inputFormatName, true,
+                JavaUtils.getClassLoader());
+        InputFormat<?, ?> inputFormat = (InputFormat<?, ?>) c.newInstance();
+
+        if ("org.apache.hadoop.mapred.TextInputFormat".equals(inputFormatName)) {
+            // TextInputFormat needs a special configuration
+            ((TextInputFormat) inputFormat).configure(jobConf);
+        }
+
+        return inputFormat;
+    }
+
+    /*
+     * Goes over the table partitions metadata and extracts the splits and the
+     * InputFormat and Serde per split.
+     */
+    private void fetchTableMetaData(Metadata.Table tblDesc) throws Exception {
+
+        Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
+
+        verifySchema(tbl);
+
+        List<Partition> partitions = null;
+        String filterStringForHive = "";
+
+        // If query has filter and hive table has partitions, prepare the filter
+        // string for hive metastore and retrieve only the matched partitions
+        if (inputData.hasFilter() && tbl.getPartitionKeysSize() > 0) {
+
+            // Save all hive partition names in a set for later filter match
+            for (FieldSchema fs : tbl.getPartitionKeys()) {
+                setPartitions.add(fs.getName());
+            }
+
+            LOG.debug("setPartitions :" + setPartitions);
+
+            // Generate filter string for retrieve match pxf filter/hive
+            // partition name
+            filterStringForHive = buildFilterStringForHive();
+        }
+
+        if (!filterStringForHive.isEmpty()) {
+
+            LOG.debug("Filter String for Hive partition retrieval : "
+                    + filterStringForHive);
+
+            filterInFragmenter = true;
+
+            // API call to Hive Metastore, will return a List of all the
+            // partitions for this table, that matches the partition filters
+            // Defined in filterStringForHive.
+            partitions = client.listPartitionsByFilter(tblDesc.getDbName(),
+                    tblDesc.getTableName(), filterStringForHive, ALL_PARTS);
+
+            // No matched partitions for the filter, no fragments to return.
+            if (partitions == null || partitions.isEmpty()) {
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Table -  " + tblDesc.getDbName() + "."
+                            + tblDesc.getTableName()
+                            + " Has no matched partitions for the filter : "
+                            + filterStringForHive);
+                }
+                return;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Table -  " + tblDesc.getDbName() + "."
+                        + tblDesc.getTableName()
+                        + " Matched partitions list size: " + partitions.size());
+            }
+
+        } else {
+            // API call to Hive Metastore, will return a List of all the
+            // partitions for this table (no filtering)
+            partitions = client.listPartitions(tblDesc.getDbName(),
+                    tblDesc.getTableName(), ALL_PARTS);
+        }
+
+        StorageDescriptor descTable = tbl.getSd();
+        Properties props;
+
+        if (partitions.isEmpty()) {
+            props = getSchema(tbl);
+            fetchMetaDataForSimpleTable(descTable, props);
+        } else {
+            List<FieldSchema> partitionKeys = tbl.getPartitionKeys();
+
+            for (Partition partition : partitions) {
+                StorageDescriptor descPartition = partition.getSd();
+                props = MetaStoreUtils.getSchema(descPartition, descTable,
+                        null, // Map<string, string> parameters - can be empty
+                        tblDesc.getDbName(), tblDesc.getTableName(), // table
+                                                                     // name
+                        partitionKeys);
+                fetchMetaDataForPartitionedTable(descPartition, props,
+                        partition, partitionKeys, tblDesc.getTableName());
+            }
+        }
+    }
+
+    void verifySchema(Table tbl) throws Exception {
+        /* nothing to verify here */
+    }
+
+    private static Properties getSchema(Table table) {
+        return MetaStoreUtils.getSchema(table.getSd(), table.getSd(),
+                table.getParameters(), table.getDbName(), table.getTableName(),
+                table.getPartitionKeys());
+    }
+
+    private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
+                                             Properties props) throws Exception {
+        fetchMetaDataForSimpleTable(stdsc, props, null);
+    }
+
+    private void fetchMetaDataForSimpleTable(StorageDescriptor stdsc,
+                                             Properties props, String tableName)
+            throws Exception {
+        fetchMetaData(new HiveTablePartition(stdsc, props, null, null,
+                tableName));
+    }
+
+    private void fetchMetaDataForPartitionedTable(StorageDescriptor stdsc,
+                                                  Properties props,
+                                                  Partition partition,
+                                                  List<FieldSchema> partitionKeys,
+                                                  String tableName)
+            throws Exception {
+        fetchMetaData(new HiveTablePartition(stdsc, props, partition,
+                partitionKeys, tableName));
+    }
+
+    /* Fills a table partition */
+    private void fetchMetaData(HiveTablePartition tablePartition)
+            throws Exception {
+        InputFormat<?, ?> fformat = makeInputFormat(
+                tablePartition.storageDesc.getInputFormat(), jobConf);
+        FileInputFormat.setInputPaths(jobConf, new Path(
+                tablePartition.storageDesc.getLocation()));
+
+        InputSplit[] splits = null;
+        try {
+            splits = fformat.getSplits(jobConf, 1);
+        } catch (org.apache.hadoop.mapred.InvalidInputException e) {
+            LOG.debug("getSplits failed on " + e.getMessage());
+            return;
+        }
+
+        for (InputSplit split : splits) {
+            FileSplit fsp = (FileSplit) split;
+            String[] hosts = fsp.getLocations();
+            String filepath = fsp.getPath().toUri().getPath();
+
+            byte[] locationInfo = HdfsUtilities.prepareFragmentMetadata(fsp);
+            Fragment fragment = new Fragment(filepath, hosts, locationInfo,
+                    makeUserData(tablePartition));
+            fragments.add(fragment);
+        }
+    }
+
+    /* Turns a Properties class into a string */
+    private String serializeProperties(Properties props) throws Exception {
+        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+        props.store(outStream, ""/* comments */);
+        return outStream.toString();
+    }
+
+    /* Turns the partition keys into a string */
+    String serializePartitionKeys(HiveTablePartition partData) throws Exception {
+        if (partData.partition == null) /*
+                                         * this is a simple hive table - there
+                                         * are no partitions
+                                         */{
+            return HIVE_NO_PART_TBL;
+        }
+
+        StringBuilder partitionKeys = new StringBuilder();
+        String prefix = "";
+        ListIterator<String> valsIter = partData.partition.getValues().listIterator();
+        ListIterator<FieldSchema> keysIter = partData.partitionKeys.listIterator();
+        while (valsIter.hasNext() && keysIter.hasNext()) {
+            FieldSchema key = keysIter.next();
+            String name = key.getName();
+            String type = key.getType();
+            String val = valsIter.next();
+            String oneLevel = prefix + name + HIVE_1_PART_DELIM + type
+                    + HIVE_1_PART_DELIM + val;
+            partitionKeys.append(oneLevel);
+            prefix = HIVE_PARTITIONS_DELIM;
+        }
+
+        return partitionKeys.toString();
+    }
+
+    byte[] makeUserData(HiveTablePartition partData) throws Exception {
+        String inputFormatName = partData.storageDesc.getInputFormat();
+        String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
+        String propertiesString = serializeProperties(partData.properties);
+        String partitionKeys = serializePartitionKeys(partData);
+        String userData = inputFormatName + HIVE_UD_DELIM + serdeName
+                + HIVE_UD_DELIM + propertiesString + HIVE_UD_DELIM
+                + partitionKeys + HIVE_UD_DELIM + filterInFragmenter;
+
+        return userData.getBytes();
+    }
+
+    /*
+     * Build filter string for HiveMetaStoreClient.listPartitionsByFilter API
+     * method.
+     *
+     * The filter string parameter for
+     * HiveMetaStoreClient.listPartitionsByFilter will be created from the
+     * incoming getFragments filter string parameter. It will be in a format of:
+     * [PARTITON1 NAME] = \"[PARTITON1 VALUE]\" AND [PARTITON2 NAME] =
+     * \"[PARTITON2 VALUE]\" ... Filtering can be done only on string partition
+     * keys and AND operators.
+     *
+     * For Example for query: SELECT * FROM TABLE1 WHERE part1 = 'AAAA' AND
+     * part2 = '1111' For HIVE HiveMetaStoreClient.listPartitionsByFilter, the
+     * incoming HAWQ filter string will be mapped into :
+     * "part1 = \"AAAA\" and part2 = \"1111\""
+     */
+    private String buildFilterStringForHive() throws Exception {
+
+        StringBuilder filtersString = new StringBuilder();
+        String filterInput = inputData.getFilterString();
+
+        if (LOG.isDebugEnabled()) {
+
+            for (ColumnDescriptor cd : inputData.getTupleDescription()) {
+                LOG.debug("ColumnDescriptor : " + cd);
+            }
+
+            LOG.debug("Filter string input : " + inputData.getFilterString());
+        }
+
+        HiveFilterBuilder eval = new HiveFilterBuilder(inputData);
+        Object filter = eval.getFilterObject(filterInput);
+
+        String prefix = "";
+
+        if (filter instanceof List) {
+
+            for (Object f : (List<?>) filter) {
+                if (buildSingleFilter(f, filtersString, prefix)) {
+                    // Set 'and' operator between each matched partition filter.
+                    prefix = " and ";
+                }
+            }
+
+        } else {
+            buildSingleFilter(filter, filtersString, prefix);
+        }
+
+        return filtersString.toString();
+    }
+
+    /*
+     * Build filter string for a single filter and append to the filters string.
+     * Filter string shell be added if filter name match hive partition name
+     * Single filter will be in a format of: [PARTITON NAME] = \"[PARTITON
+     * VALUE]\"
+     */
+    private boolean buildSingleFilter(Object filter,
+                                      StringBuilder filtersString, String prefix)
+            throws Exception {
+
+        // Let's look first at the filter
+        FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
+
+        // In case this is not an "equality filter", we ignore this filter (no
+        // add to filter list)
+        if (!(bFilter.getOperation() == FilterParser.Operation.HDOP_EQ)) {
+            LOG.debug("Filter operator is not EQ, ignore this filter for hive : "
+                    + filter);
+            return false;
+        }
+
+        // Extract column name and value
+        int filterColumnIndex = bFilter.getColumn().index();
+        String filterValue = bFilter.getConstant().constant().toString();
+        ColumnDescriptor filterColumn = inputData.getColumn(filterColumnIndex);
+        String filterColumnName = filterColumn.columnName();
+
+        // In case this filter is not a partition, we ignore this filter (no add
+        // to filter list)
+        if (!setPartitions.contains(filterColumnName)) {
+            LOG.debug("Filter name is not a partition , ignore this filter for hive: "
+                    + filter);
+            return false;
+        }
+
+        filtersString.append(prefix);
+        filtersString.append(filterColumnName);
+        filtersString.append(HIVE_API_EQ);
+        filtersString.append(HIVE_API_DQUOTE);
+        filtersString.append(filterValue);
+        filtersString.append(HIVE_API_DQUOTE);
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilder.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilder.java
new file mode 100644
index 0000000..fbf9eee
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveFilterBuilder.java
@@ -0,0 +1,129 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.utilities.InputData;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Uses the filter parser code to build a filter object, either simple - a
+ * single {@link org.apache.hawq.pxf.api.FilterParser.BasicFilter} object or a
+ * compound - a {@link java.util.List} of
+ * {@link org.apache.hawq.pxf.api.FilterParser.BasicFilter} objects.
+ * {@link org.apache.hawq.pxf.plugins.hive.HiveAccessor} will use the filter for
+ * partition filtering.
+ */
+public class HiveFilterBuilder implements FilterParser.FilterBuilder {
+    private InputData inputData;
+
+    /**
+     * Constructs a HiveFilterBuilder object.
+     *
+     * @param input input data containing filter string
+     */
+    public HiveFilterBuilder(InputData input) {
+        inputData = input;
+    }
+
+    /**
+     * Translates a filterString into a {@link org.apache.hawq.pxf.api.FilterParser.BasicFilter} or a
+     * list of such filters.
+     *
+     * @param filterString the string representation of the filter
+     * @return a single {@link org.apache.hawq.pxf.api.FilterParser.BasicFilter}
+     *         object or a {@link java.util.List} of
+     *         {@link org.apache.hawq.pxf.api.FilterParser.BasicFilter} objects.
+     * @throws Exception if parsing the filter failed or filter is not a basic
+     *             filter or list of basic filters
+     */
+    public Object getFilterObject(String filterString) throws Exception {
+        FilterParser parser = new FilterParser(this);
+        Object result = parser.parse(filterString);
+
+        if (!(result instanceof FilterParser.BasicFilter)
+                && !(result instanceof List)) {
+            throw new Exception("String " + filterString
+                    + " resolved to no filter");
+        }
+
+        return result;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Object build(FilterParser.Operation opId, Object leftOperand,
+                        Object rightOperand) throws Exception {
+        if (leftOperand instanceof FilterParser.BasicFilter
+                || leftOperand instanceof List) {
+            if (opId != FilterParser.Operation.HDOP_AND
+                    || !(rightOperand instanceof FilterParser.BasicFilter)) {
+                throw new Exception(
+                        "Only AND is allowed between compound expressions");
+            }
+
+            if (leftOperand instanceof List) {
+                return handleCompoundOperations(
+                        (List<FilterParser.BasicFilter>) leftOperand,
+                        (FilterParser.BasicFilter) rightOperand);
+            } else {
+                return handleCompoundOperations(
+                        (FilterParser.BasicFilter) leftOperand,
+                        (FilterParser.BasicFilter) rightOperand);
+            }
+        }
+
+        if (!(rightOperand instanceof FilterParser.Constant)) {
+            throw new Exception(
+                    "expressions of column-op-column are not supported");
+        }
+
+        // Assume column is on the left
+        return handleSimpleOperations(opId,
+                (FilterParser.ColumnIndex) leftOperand,
+                (FilterParser.Constant) rightOperand);
+    }
+
+    /*
+     * Handles simple column-operator-constant expressions Creates a special
+     * filter in the case the column is the row key column
+     */
+    private FilterParser.BasicFilter handleSimpleOperations(FilterParser.Operation opId,
+                                                            FilterParser.ColumnIndex column,
+                                                            FilterParser.Constant constant) {
+        return new FilterParser.BasicFilter(opId, column, constant);
+    }
+
+    /**
+     * Handles AND of already calculated expressions. Currently only AND, in the
+     * future OR can be added
+     *
+     * Four cases here:
+     * <ol>
+     * <li>both are simple filters</li>
+     * <li>left is a FilterList and right is a filter</li>
+     * <li>left is a filter and right is a FilterList</li>
+     * <li>both are FilterLists</li>
+     * </ol>
+     * Currently, 1, 2 can occur, since no parenthesis are used
+     *
+     * @param left left hand filter
+     * @param right right hand filter
+     * @return list of filters constructing the filter tree
+     */
+    private List<FilterParser.BasicFilter> handleCompoundOperations(List<FilterParser.BasicFilter> left,
+                                                                    FilterParser.BasicFilter right) {
+        left.add(right);
+        return left;
+    }
+
+    private List<FilterParser.BasicFilter> handleCompoundOperations(FilterParser.BasicFilter left,
+                                                                    FilterParser.BasicFilter right) {
+        List<FilterParser.BasicFilter> result = new LinkedList<FilterParser.BasicFilter>();
+
+        result.add(left);
+        result.add(right);
+
+        return result;
+    }
+}


Mime
View raw message