hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject [4/4] incubator-hawq git commit: HAWQ-931. ORC optimized profile for PPD/CP
Date Thu, 08 Sep 2016 20:56:06 GMT
HAWQ-931. ORC optimized profile for PPD/CP


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/98a302da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/98a302da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/98a302da

Branch: refs/heads/HAWQ-931
Commit: 98a302da09db7952d782809c784c7a820ee579ef
Parents: 8906240
Author: Shivram Mani <shivram.mani@gmail.com>
Authored: Wed Jul 27 17:48:59 2016 -0700
Committer: Shivram Mani <shivram.mani@gmail.com>
Committed: Tue Aug 2 15:12:58 2016 -0700

----------------------------------------------------------------------
 pxf/build.gradle                                |   1 +
 pxf/gradle.properties                           |   3 +-
 .../pxf/api/utilities/ColumnDescriptor.java     |  28 +-
 .../plugins/hive/HiveColumnarSerdeResolver.java |   7 +-
 .../plugins/hive/HiveInputFormatFragmenter.java |  16 +-
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  | 170 +++++++
 .../pxf/plugins/hive/HiveORCSerdeResolver.java  | 439 +++++++++++++++++++
 .../hawq/pxf/service/rest/RestResource.java     |   8 +-
 .../pxf/service/utilities/ProtocolData.java     |  30 +-
 .../src/main/resources/pxf-profiles-default.xml |  14 +
 10 files changed, 707 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/build.gradle
----------------------------------------------------------------------
diff --git a/pxf/build.gradle b/pxf/build.gradle
index 23d688f..cd29c01 100644
--- a/pxf/build.gradle
+++ b/pxf/build.gradle
@@ -314,6 +314,7 @@ project('pxf-hive') {
         compile "org.apache.hive:hive-metastore:$hiveVersion"
         compile "org.apache.hive:hive-common:$hiveVersion"
         compile "org.apache.hive:hive-serde:$hiveVersion"
+        compile "org.apache.orc:orc-core:$orcVersion"
         testCompile 'pl.pragmatists:JUnitParams:1.0.2'
         configurations {
             // Remove hive-exec from unit tests as it causes VerifyError

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/gradle.properties
----------------------------------------------------------------------
diff --git a/pxf/gradle.properties b/pxf/gradle.properties
index 6827b89..a601936 100644
--- a/pxf/gradle.properties
+++ b/pxf/gradle.properties
@@ -23,4 +23,5 @@ hiveVersion=1.2.1
 hbaseVersionJar=1.1.2
 hbaseVersionRPM=1.1.2
 tomcatVersion=7.0.62
-pxfProtocolVersion=v14
\ No newline at end of file
+pxfProtocolVersion=v14
+orcVersion=1.1.1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
index baaca1d..4b9dc9c 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/ColumnDescriptor.java
@@ -30,6 +30,7 @@ public class ColumnDescriptor {
     String gpdbColumnName;
     String gpdbColumnTypeName;
     int gpdbColumnIndex;
+    boolean isProjected;
 
     /**
      * Reserved word for a table record key.
@@ -50,6 +51,21 @@ public class ColumnDescriptor {
         gpdbColumnTypeName = typename;
         gpdbColumnName = name;
         gpdbColumnIndex = index;
+        isProjected = true;
+    }
+
+    /**
+     * Constructs a ColumnDescriptor.
+     *
+     * @param name column name
+     * @param typecode OID
+     * @param index column index
+     * @param typename type name
+     * @param isProj type boolean
+     */
+    public ColumnDescriptor(String name, int typecode, int index, String typename, boolean
isProj) {
+        this(name, typecode, index, typename);
+        isProjected = isProj;
     }
 
     /**
@@ -62,6 +78,7 @@ public class ColumnDescriptor {
         this.gpdbColumnName = copy.gpdbColumnName;
         this.gpdbColumnIndex = copy.gpdbColumnIndex;
         this.gpdbColumnTypeName = copy.gpdbColumnTypeName;
+        this.isProjected = copy.isProjected;
     }
 
     public String columnName() {
@@ -89,11 +106,20 @@ public class ColumnDescriptor {
         return RECORD_KEY_NAME.equalsIgnoreCase(gpdbColumnName);
     }
 
+    public boolean isProjected() {
+        return isProjected;
+    }
+
+    public void setProjected(boolean projected) {
+        isProjected = projected;
+    }
+
     @Override
 	public String toString() {
 		return "ColumnDescriptor [gpdbColumnTypeCode=" + gpdbColumnTypeCode
 				+ ", gpdbColumnName=" + gpdbColumnName
 				+ ", gpdbColumnTypeName=" + gpdbColumnTypeName
-				+ ", gpdbColumnIndex=" + gpdbColumnIndex + "]";
+				+ ", gpdbColumnIndex=" + gpdbColumnIndex
+                + ", isProjected=" + isProjected + "]";
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index d298bac..497ee2e 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -19,7 +19,6 @@ package org.apache.hawq.pxf.plugins.hive;
  * under the License.
  */
 
-
 import org.apache.hawq.pxf.api.BadRecordException;
 import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.OneRow;
@@ -31,6 +30,7 @@ import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedColumnarSerDe;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDeBase;
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -76,6 +77,8 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
             serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
         } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name()))
{
             serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
+        } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name()))
{
+            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE;
         } else {
             throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
         }
@@ -138,6 +141,8 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
             deserializer = new ColumnarSerDe();
         } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE)
{
             deserializer = new LazyBinaryColumnarSerDe();
+        } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE)
{
+            deserializer = new VectorizedColumnarSerDe();
         } else {
             throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name());
/* we should not get here */
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index a666b8b..955aa9a 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -56,9 +56,12 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
 
     static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
     static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
+    static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
     static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
     static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
     static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+    static final String STR_VECTORIZED_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.VectorizedOrcSerde";
+    static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
     private static final int EXPECTED_NUM_OF_TOKS = 3;
     public static final int TOK_SERDE = 0;
     public static final int TOK_KEYS = 1;
@@ -67,14 +70,17 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
     /** Defines the Hive input formats currently supported in pxf */
     public enum PXF_HIVE_INPUT_FORMATS {
         RC_FILE_INPUT_FORMAT,
-        TEXT_FILE_INPUT_FORMAT
+        TEXT_FILE_INPUT_FORMAT,
+        ORC_FILE_INPUT_FORMAT
     }
 
     /** Defines the Hive serializers (serde classes) currently supported in pxf */
     public enum PXF_HIVE_SERDES {
         COLUMNAR_SERDE,
         LAZY_BINARY_COLUMNAR_SERDE,
-        LAZY_SIMPLE_SERDE
+        LAZY_SIMPLE_SERDE,
+        ORC_SERDE,
+        VECTORIZED_ORC_SERDE
     }
 
     /**
@@ -234,6 +240,8 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
                 return PXF_HIVE_INPUT_FORMATS.RC_FILE_INPUT_FORMAT.name();
             case STR_TEXT_FILE_INPUT_FORMAT:
                 return PXF_HIVE_INPUT_FORMATS.TEXT_FILE_INPUT_FORMAT.name();
+            case STR_ORC_FILE_INPUT_FORMAT:
+                return PXF_HIVE_INPUT_FORMATS.ORC_FILE_INPUT_FORMAT.name();
             default:
                 throw new IllegalArgumentException(
                         "HiveInputFormatFragmenter does not yet support "
@@ -259,6 +267,10 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
                 return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
             case STR_LAZY_SIMPLE_SERDE:
                 return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
+            case STR_ORC_SERDE:
+                return PXF_HIVE_SERDES.ORC_SERDE.name();
+            case STR_VECTORIZED_ORC_SERDE:
+                return PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name();
             default:
                 throw new UnsupportedTypeException(
                         "HiveInputFormatFragmenter does not yet support  "

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
new file mode 100644
index 0000000..43c48b2
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -0,0 +1,170 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hawq.pxf.api.FilterParser;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.commons.lang.StringUtils;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+
+/**
+ * Specialization of HiveAccessor for a Hive table that stores only ORC files.
+ * This class replaces the generic HiveAccessor for a case where a table is stored entirely
as ORC files.
+ * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver}
+ */
+public class HiveORCAccessor extends HiveAccessor {
+
+    private RecordReader batchReader = null;
+    private Reader reader = null;
+    private VectorizedRowBatch batch = null;
+
+    private final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
+    private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
+    private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
+    private final String SARG_PUSHDOWN = "sarg.pushdown";
+
+    /**
+     * Constructs a HiveRCFileAccessor.
+     *
+     * @param input input containing user data
+     * @throws Exception if user data was wrong
+     */
+    public HiveORCAccessor(InputData input) throws Exception {
+        super(input, new OrcInputFormat());
+        String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(),
PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.ORC_SERDE.name(), PXF_HIVE_SERDES.VECTORIZED_ORC_SERDE.name());
+        initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
+        filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+    }
+
+    @Override
+    public boolean openForRead() throws Exception {
+        addColumns();
+        addFilters();
+        return super.openForRead();
+    }
+
+    @Override
+    protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException {
+        return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+    }
+
+    /**
+     * Adds the table tuple description to JobConf ojbect
+     * so only these columns will be returned.
+     */
+    private void addColumns() throws Exception {
+
+        List<String> colIds = new ArrayList<String>();
+        List<String> colNames = new ArrayList<String>();
+        for(ColumnDescriptor col: inputData.getTupleDescription()) {
+            if(col.isProjected()) {
+                colIds.add(String.valueOf(col.columnIndex()));
+                colNames.add(col.columnName());
+            }
+        }
+        jobConf.set(READ_ALL_COLUMNS, "false");
+        jobConf.set(READ_COLUMN_IDS_CONF_STR, StringUtils.join(colIds, ","));
+        jobConf.set(READ_COLUMN_NAMES_CONF_STR, StringUtils.join(colNames, ","));
+    }
+
+    /**
+     * Uses {@link HiveFilterBuilder} to translate a filter string into a
+     * Hive {@link SearchArgument} object. The result is added as a filter to
+     * JobConf object
+     */
+    private void addFilters() throws Exception {
+        if (!inputData.hasFilter()) {
+            return;
+        }
+
+        /* Predicate pushdown configuration */
+        String filterStr = inputData.getFilterString();
+        HiveFilterBuilder eval = new HiveFilterBuilder(inputData);
+        Object filter = eval.getFilterObject(filterStr);
+
+        SearchArgument.Builder filterBuilder = SearchArgumentFactory.newBuilder();
+        filterBuilder.startAnd();
+        if (filter instanceof List) {
+            for (Object f : (List<?>) filter) {
+                buildArgument(filterBuilder, f);
+            }
+        } else {
+            buildArgument(filterBuilder, filter);
+        }
+        filterBuilder.end();
+        SearchArgument sarg = filterBuilder.build();
+        jobConf.set(SARG_PUSHDOWN, sarg.toKryo());
+    }
+
+    private void buildArgument(SearchArgument.Builder builder, Object filterObj) {
+        /* The below functions will not be compatible and requires update  with Hive 2.0
APIs */
+        FilterParser.BasicFilter filter = (FilterParser.BasicFilter) filterObj;
+        int filterColumnIndex = filter.getColumn().index();
+        Object filterValue = filter.getConstant().constant();
+        ColumnDescriptor filterColumn = inputData.getColumn(filterColumnIndex);
+        String filterColumnName = filterColumn.columnName();
+
+        switch(filter.getOperation()) {
+            case HDOP_LT:
+                builder.lessThan(filterColumnName, filterValue);
+                break;
+            case HDOP_GT:
+                builder.startNot().lessThanEquals(filterColumnName, filterValue).end();
+                break;
+            case HDOP_LE:
+                builder.lessThanEquals(filterColumnName, filterValue);
+                break;
+            case HDOP_GE:
+                builder.startNot().lessThanEquals(filterColumnName, filterValue).end();
+                break;
+            case HDOP_EQ:
+                builder.equals(filterColumnName, filterValue);
+                break;
+            case HDOP_NE:
+                builder.startNot().equals(filterColumnName, filterValue).end();
+                break;
+            case HDOP_LIKE:
+                break;
+        }
+        return;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
new file mode 100644
index 0000000..6ac4e70
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
@@ -0,0 +1,439 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.lang.CharUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.*;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+import static org.apache.hawq.pxf.api.io.DataType.DATE;
+import static org.apache.hawq.pxf.api.io.DataType.SMALLINT;
+
+/**
+ * Specialized HiveResolver for a Hive table stored as RC file.
+ * Use together with HiveInputFormatFragmenter/HiveRCFileAccessor.
+ */
+public class HiveORCSerdeResolver extends HiveResolver {
+    private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
+    private OrcSerde deserializer;
+    private boolean firstColumn;
+    private StringBuilder builder;
+    private StringBuilder parts;
+    private int numberOfPartitions;
+    private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+    private static final String MAPKEY_DELIM = ":";
+    private static final String COLLECTION_DELIM = ",";
+    private String collectionDelim;
+    private String mapkeyDelim;
+
+    public HiveORCSerdeResolver(InputData input) throws Exception {
+        super(input);
+    }
+
+    /* read the data supplied by the fragmenter: inputformat name, serde name, partition
keys */
+    @Override
+    void parseUserData(InputData input) throws Exception {
+        String[] toks = HiveInputFormatFragmenter.parseToks(input);
+        String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
+        if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name()))
{
+            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE;
+        } else {
+            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
+        }
+        parts = new StringBuilder();
+        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+        parseDelimiterChar(input);
+        collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
+                : input.getUserProperty("COLLECTION_DELIM");
+        mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
+                : input.getUserProperty("MAPKEY_DELIM");
+    }
+
+    @Override
+    void initPartitionFields() {
+        numberOfPartitions = initPartitionFields(parts);
+    }
+
+    /**
+     * getFields returns a singleton list of OneField item.
+     * OneField item contains two fields: an integer representing the VARCHAR type and a
Java
+     * Object representing the field value.
+     */
+    @Override
+    public List<OneField> getFields(OneRow onerow) throws Exception {
+
+        Object tuple = deserializer.deserialize((Writable) onerow.getData());
+        // Each Hive record is a Struct
+        StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
+        List<OneField> record = traverseStruct(tuple, soi, false);
+
+        return record;
+
+    }
+
+    /*
+     * Get and init the deserializer for the records of this Hive data fragment.
+     * Suppress Warnings added because deserializer.initialize is an abstract function that
is deprecated
+     * but its implementations (ColumnarSerDe, LazyBinaryColumnarSerDe) still use the deprecated
interface.
+     */
+    @SuppressWarnings("deprecation")
+	@Override
+    void initSerde(InputData input) throws Exception {
+        Properties serdeProperties = new Properties();
+        int numberOfDataColumns = input.getColumns() - numberOfPartitions;
+
+        LOG.debug("Serde number of columns is " + numberOfDataColumns);
+
+        StringBuilder columnNames = new StringBuilder(numberOfDataColumns * 2); // column
+ delimiter
+        StringBuilder columnTypes = new StringBuilder(numberOfDataColumns * 2); // column
+ delimiter
+        String delim = "";
+        for (int i = 0; i < numberOfDataColumns; i++) {
+            ColumnDescriptor column = input.getColumn(i);
+            String columnName = column.columnName();
+            String columnType = HiveInputFormatFragmenter.toHiveType(DataType.get(column.columnTypeCode()),
columnName);
+            columnNames.append(delim).append(columnName);
+            columnTypes.append(delim).append(columnType);
+            delim = ",";
+        }
+        serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
+        serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
+
+        if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) {
+            deserializer = new OrcSerde();
+        } else {
+            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name());
/* we should not get here */
+        }
+
+        deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class),
serdeProperties);
+    }
+
+    /*
+     * If the object representing the whole record is null or if an object
+     * representing a composite sub-object (map, list,..) is null - then
+     * BadRecordException will be thrown. If a primitive field value is null,
+     * then a null will appear for the field in the record in the query result.
+     */
+    private void traverseTuple(Object obj, ObjectInspector objInspector,
+                               List<OneField> record, boolean toFlatten)
+            throws IOException, BadRecordException {
+        ObjectInspector.Category category = objInspector.getCategory();
+        if ((obj == null) && (category != ObjectInspector.Category.PRIMITIVE)) {
+            throw new BadRecordException("NULL Hive composite object");
+        }
+        switch (category) {
+            case PRIMITIVE:
+                resolvePrimitive(obj, (PrimitiveObjectInspector) objInspector,
+                        record, toFlatten);
+                break;
+            case LIST:
+                List<OneField> listRecord = traverseList(obj,
+                        (ListObjectInspector) objInspector);
+                addOneFieldToRecord(record, TEXT, String.format("[%s]",
+                        HdfsUtilities.toString(listRecord, collectionDelim)));
+                break;
+            case MAP:
+                List<OneField> mapRecord = traverseMap(obj,
+                        (MapObjectInspector) objInspector);
+                addOneFieldToRecord(record, TEXT, String.format("{%s}",
+                        HdfsUtilities.toString(mapRecord, collectionDelim)));
+                break;
+            case STRUCT:
+                List<OneField> structRecord = traverseStruct(obj,
+                        (StructObjectInspector) objInspector, true);
+                addOneFieldToRecord(record, TEXT, String.format("{%s}",
+                        HdfsUtilities.toString(structRecord, collectionDelim)));
+                break;
+            case UNION:
+                List<OneField> unionRecord = traverseUnion(obj,
+                        (UnionObjectInspector) objInspector);
+                addOneFieldToRecord(record, TEXT, String.format("[%s]",
+                        HdfsUtilities.toString(unionRecord, collectionDelim)));
+                break;
+            default:
+                throw new UnsupportedTypeException("Unknown category type: "
+                        + objInspector.getCategory());
+        }
+    }
+
+    private List<OneField> traverseUnion(Object obj, UnionObjectInspector uoi)
+            throws BadRecordException, IOException {
+        List<OneField> unionRecord = new LinkedList<>();
+        List<? extends ObjectInspector> ois = uoi.getObjectInspectors();
+        if (ois == null) {
+            throw new BadRecordException(
+                    "Illegal value NULL for Hive data type Union");
+        }
+        traverseTuple(uoi.getField(obj), ois.get(uoi.getTag(obj)), unionRecord,
+                true);
+        return unionRecord;
+    }
+
+    private List<OneField> traverseList(Object obj, ListObjectInspector loi)
+            throws BadRecordException, IOException {
+        List<OneField> listRecord = new LinkedList<>();
+        List<?> list = loi.getList(obj);
+        ObjectInspector eoi = loi.getListElementObjectInspector();
+        if (list == null) {
+            throw new BadRecordException(
+                    "Illegal value NULL for Hive data type List");
+        }
+        for (Object object : list) {
+            traverseTuple(object, eoi, listRecord, true);
+        }
+        return listRecord;
+    }
+
+    private List<OneField> traverseStruct(Object struct,
+                                          StructObjectInspector soi,
+                                          boolean toFlatten)
+            throws BadRecordException, IOException {
+        List<? extends StructField> fields = soi.getAllStructFieldRefs();
+        List<Object> structFields = soi.getStructFieldsDataAsList(struct);
+        if (structFields == null) {
+            throw new BadRecordException(
+                    "Illegal value NULL for Hive data type Struct");
+        }
+        List<OneField> structRecord = new LinkedList<>();
+        List<OneField> complexRecord = new LinkedList<>();
+        for (int i = 0; i < structFields.size(); i++) {
+            if (toFlatten) {
+                complexRecord.add(new OneField(TEXT.getOID(), String.format(
+                        "\"%s\"", fields.get(i).getFieldName())));
+            }
+            traverseTuple(structFields.get(i),
+                    fields.get(i).getFieldObjectInspector(), complexRecord,
+                    toFlatten);
+            if (toFlatten) {
+                addOneFieldToRecord(structRecord, TEXT,
+                        HdfsUtilities.toString(complexRecord, mapkeyDelim));
+                complexRecord.clear();
+            }
+        }
+        return toFlatten ? structRecord : complexRecord;
+    }
+
+    private List<OneField> traverseMap(Object obj, MapObjectInspector moi)
+            throws BadRecordException, IOException {
+        List<OneField> complexRecord = new LinkedList<>();
+        List<OneField> mapRecord = new LinkedList<>();
+        ObjectInspector koi = moi.getMapKeyObjectInspector();
+        ObjectInspector voi = moi.getMapValueObjectInspector();
+        Map<?, ?> map = moi.getMap(obj);
+        if (map == null) {
+            throw new BadRecordException(
+                    "Illegal value NULL for Hive data type Map");
+        } else if (map.isEmpty()) {
+            traverseTuple(null, koi, complexRecord, true);
+            traverseTuple(null, voi, complexRecord, true);
+            addOneFieldToRecord(mapRecord, TEXT,
+                    HdfsUtilities.toString(complexRecord, mapkeyDelim));
+        } else {
+            for (Map.Entry<?, ?> entry : map.entrySet()) {
+                traverseTuple(entry.getKey(), koi, complexRecord, true);
+                traverseTuple(entry.getValue(), voi, complexRecord, true);
+                addOneFieldToRecord(mapRecord, TEXT,
+                        HdfsUtilities.toString(complexRecord, mapkeyDelim));
+                complexRecord.clear();
+            }
+        }
+        return mapRecord;
+    }
+
+    private void resolvePrimitive(Object o, PrimitiveObjectInspector oi,
+                                  List<OneField> record, boolean toFlatten)
+            throws IOException {
+        Object val;
+        switch (oi.getPrimitiveCategory()) {
+            case BOOLEAN: {
+                val = (o != null) ? ((BooleanObjectInspector) oi).get(o) : null;
+                addOneFieldToRecord(record, BOOLEAN, val);
+                break;
+            }
+            case SHORT: {
+                val = (o != null) ? ((ShortObjectInspector) oi).get(o) : null;
+                addOneFieldToRecord(record, SMALLINT, val);
+                break;
+            }
+            case INT: {
+                val = (o != null) ? ((IntObjectInspector) oi).get(o) : null;
+                addOneFieldToRecord(record, INTEGER, val);
+                break;
+            }
+            case LONG: {
+                val = (o != null) ? ((LongObjectInspector) oi).get(o) : null;
+                addOneFieldToRecord(record, BIGINT, val);
+                break;
+            }
+            case FLOAT: {
+                val = (o != null) ? ((FloatObjectInspector) oi).get(o) : null;
+                addOneFieldToRecord(record, REAL, val);
+                break;
+            }
+            case DOUBLE: {
+                val = (o != null) ? ((DoubleObjectInspector) oi).get(o) : null;
+                addOneFieldToRecord(record, FLOAT8, val);
+                break;
+            }
+            case DECIMAL: {
+                String sVal = null;
+                if (o != null) {
+                    HiveDecimal hd = ((HiveDecimalObjectInspector) oi).getPrimitiveJavaObject(o);
+                    if (hd != null) {
+                        BigDecimal bd = hd.bigDecimalValue();
+                        sVal = bd.toString();
+                    }
+                }
+                addOneFieldToRecord(record, NUMERIC, sVal);
+                break;
+            }
+            case STRING: {
+                val = (o != null) ? ((StringObjectInspector) oi).getPrimitiveJavaObject(o)
+                        : null;
+                addOneFieldToRecord(record, TEXT,
+                        toFlatten ? String.format("\"%s\"", val) : val);
+                break;
+            }
+            case VARCHAR:
+                val = (o != null) ? ((HiveVarcharObjectInspector) oi).getPrimitiveJavaObject(o)
+                        : null;
+                addOneFieldToRecord(record, VARCHAR,
+                        toFlatten ? String.format("\"%s\"", val) : val);
+                break;
+            case CHAR:
+                val = (o != null) ? ((HiveCharObjectInspector) oi).getPrimitiveJavaObject(o)
+                        : null;
+                addOneFieldToRecord(record, BPCHAR,
+                        toFlatten ? String.format("\"%s\"", val) : val);
+                break;
+            case BINARY: {
+                byte[] toEncode = null;
+                if (o != null) {
+                    BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(o);
+                    toEncode = new byte[bw.getLength()];
+                    System.arraycopy(bw.getBytes(), 0, toEncode, 0,
+                            bw.getLength());
+                }
+                addOneFieldToRecord(record, BYTEA, toEncode);
+                break;
+            }
+            case TIMESTAMP: {
+                val = (o != null) ? ((TimestampObjectInspector) oi).getPrimitiveJavaObject(o)
+                        : null;
+                addOneFieldToRecord(record, TIMESTAMP, val);
+                break;
+            }
+            case DATE:
+                val = (o != null) ? ((DateObjectInspector) oi).getPrimitiveJavaObject(o)
+                        : null;
+                addOneFieldToRecord(record, DATE, val);
+                break;
+            case BYTE: { /* TINYINT */
+                val = (o != null) ? new Short(((ByteObjectInspector) oi).get(o))
+                        : null;
+                addOneFieldToRecord(record, SMALLINT, val);
+                break;
+            }
+            default: {
+                throw new UnsupportedTypeException(oi.getTypeName()
+                        + " conversion is not supported by "
+                        + getClass().getSimpleName());
+            }
+        }
+    }
+
+    private void addOneFieldToRecord(List<OneField> record,
+                                     DataType gpdbWritableType, Object val) {
+        record.add(new OneField(gpdbWritableType.getOID(), val));
+    }
+
+    /*
+     * Gets the delimiter character from the URL, verify and store it. Must be a
+     * single ascii character (same restriction as Hawq's). If a hex
+     * representation was passed, convert it to its char.
+     */
+    void parseDelimiterChar(InputData input) {
+
+        String userDelim = input.getUserProperty("DELIMITER");
+
+        if (userDelim == null) {
+            throw new IllegalArgumentException("DELIMITER is a required option");
+        }
+
+        final int VALID_LENGTH = 1;
+        final int VALID_LENGTH_HEX = 4;
+
+        if (userDelim.startsWith("\\x")) { // hexadecimal sequence
+
+            if (userDelim.length() != VALID_LENGTH_HEX) {
+                throw new IllegalArgumentException(
+                        "Invalid hexdecimal value for delimiter (got"
+                                + userDelim + ")");
+            }
+
+            delimiter = (char) Integer.parseInt(
+                    userDelim.substring(2, VALID_LENGTH_HEX), 16);
+
+            if (!CharUtils.isAscii(delimiter)) {
+                throw new IllegalArgumentException(
+                        "Invalid delimiter value. Must be a single ASCII character, or a
hexadecimal sequence (got non ASCII "
+                                + delimiter + ")");
+            }
+
+            return;
+        }
+
+        if (userDelim.length() != VALID_LENGTH) {
+            throw new IllegalArgumentException(
+                    "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal
sequence (got "
+                            + userDelim + ")");
+        }
+
+        if (!CharUtils.isAscii(userDelim.charAt(0))) {
+            throw new IllegalArgumentException(
+                    "Invalid delimiter value. Must be a single ASCII character, or a hexadecimal
sequence (got non ASCII "
+                            + userDelim + ")");
+        }
+
+        delimiter = userDelim.charAt(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
index 60bb31e..633e78c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/RestResource.java
@@ -24,6 +24,7 @@ import javax.ws.rs.core.MultivaluedMap;
 import org.apache.commons.codec.CharEncoding;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang.StringUtils;
 
 import java.io.UnsupportedEncodingException;
 import java.util.List;
@@ -56,7 +57,12 @@ public abstract class RestResource {
             String key = entry.getKey();
             List<String> values = entry.getValue();
             if (values != null) {
-                String value = values.get(0);
+                String value;
+                if(values.size() > 1) {
+                    value = StringUtils.join(values, ",");
+                } else {
+                    value = values.get(0);
+                }
                 if (value != null) {
                     // converting to value UTF-8 encoding
                     value = new String(value.getBytes(CharEncoding.ISO_8859_1),

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
index 0337937..f492378 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java
@@ -383,16 +383,40 @@ public class ProtocolData extends InputData {
 
     /*
      * Sets the tuple description for the record
+     * Attribute Projection information is optional
      */
     void parseTupleDescription() {
+
+        /* Process column projection info */
+        String columnProjStr = getOptionalProperty("ATTRS-PROJ");
+        List<Integer> columnProjList = new ArrayList<Integer>();
+        if(columnProjStr != null) {
+            int columnProj = Integer.parseInt(columnProjStr);
+            if(columnProj > 0) {
+                String columnProjIndexStr = getProperty("ATTRS-PROJ-IDX");
+                String columnProjIdx[] = columnProjIndexStr.split(",");
+                for(int i = 0; i < columnProj; i++) {
+                    columnProjList.add(Integer.valueOf(columnProjIdx[i]));
+                }
+            } else {
+                /* This is a special case to handle aggregate queries not related to any
specific column
+                * eg: count(*) queries. */
+                columnProjList.add(0);
+            }
+        }
+
         int columns = getIntProperty("ATTRS");
         for (int i = 0; i < columns; ++i) {
             String columnName = getProperty("ATTR-NAME" + i);
             int columnTypeCode = getIntProperty("ATTR-TYPECODE" + i);
             String columnTypeName = getProperty("ATTR-TYPENAME" + i);
-
-            ColumnDescriptor column = new ColumnDescriptor(columnName,
-                    columnTypeCode, i, columnTypeName);
+            ColumnDescriptor column;
+            if(columnProjStr != null) {
+                column = new ColumnDescriptor(columnName, columnTypeCode, i, columnTypeName,
columnProjList.contains(i));
+            } else {
+                /* For data formats that don't support column projection */
+                column = new ColumnDescriptor(columnName, columnTypeCode, i, columnTypeName);
+            }
             tupleDescription.add(column);
 
             if (columnName.equalsIgnoreCase(ColumnDescriptor.RECORD_KEY_NAME)) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/98a302da/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index d72df94..9025cc1 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -80,6 +80,20 @@ under the License.
         </plugins>
     </profile>
     <profile>
+        <name>HiveORC</name>
+        <description>This profile is suitable only for Hive tables stored in ORC files
+            and serialized with either the ColumnarSerDe or the LazyBinaryColumnarSerDe.
+            It is much faster than the general purpose Hive profile.
+            DELIMITER parameter is mandatory.
+        </description>
+        <plugins>
+            <fragmenter>org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter>
+            <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCAccessor</accessor>
+            <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCSerdeResolver</resolver>
+            <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+        </plugins>
+    </profile>
+    <profile>
         <name>HdfsTextSimple</name>
         <description>This profile is suitable for using when reading delimited single
line records from plain text files
             on HDFS



Mime
View raw message