hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From odiache...@apache.org
Subject incubator-hawq git commit: HAWQ-1446: Introduce vectorized profile for ORC.
Date Thu, 22 Jun 2017 02:58:14 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/master 339806f3a -> 29a160839


HAWQ-1446: Introduce vectorized profile for ORC.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/29a16083
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/29a16083
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/29a16083

Branch: refs/heads/master
Commit: 29a160839949a1a08244962c5255933f714af46c
Parents: 339806f
Author: Oleksandr Diachenko <odiachenko@pivotal.io>
Authored: Wed Jun 21 19:58:08 2017 -0700
Committer: Oleksandr Diachenko <odiachenko@pivotal.io>
Committed: Wed Jun 21 19:58:08 2017 -0700

----------------------------------------------------------------------
 .../hawq/pxf/api/ReadVectorizedResolver.java    |  39 ++
 .../org/apache/hawq/pxf/api/StatsAccessor.java  |   2 +-
 .../hawq/pxf/api/utilities/Utilities.java       |  22 +-
 .../hawq/pxf/api/utilities/UtilitiesTest.java   |  29 ++
 .../pxf/plugins/hive/HiveDataFragmenter.java    |   5 +-
 .../plugins/hive/HiveORCVectorizedAccessor.java | 106 ++++++
 .../plugins/hive/HiveORCVectorizedResolver.java | 367 +++++++++++++++++++
 .../plugins/hive/utilities/ProfileFactory.java  |  15 +-
 .../hawq/pxf/service/BridgeOutputBuilder.java   |  13 +
 .../org/apache/hawq/pxf/service/ReadBridge.java |   2 +-
 .../hawq/pxf/service/ReadVectorizedBridge.java  | 102 ++++++
 .../hawq/pxf/service/rest/BridgeResource.java   |   3 +
 .../src/main/resources/pxf-profiles-default.xml |  11 +
 13 files changed, 707 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java
b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java
new file mode 100644
index 0000000..55f8df5
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/ReadVectorizedResolver.java
@@ -0,0 +1,39 @@
+package org.apache.hawq.pxf.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+/**
+ * 
+ * Interface that defines deserialization batch of records at once.
+ *
+ */
+public interface ReadVectorizedResolver {
+
+    /**
+     * Returns resolved list of tuples
+     * 
+     * @param batch unresolved batch
+     * @return list of tuples
+     */
+    public List<List<OneField>> getFieldsForBatch(OneRow batch);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
index d256e77..ec65bd8 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java
@@ -29,7 +29,7 @@ public interface StatsAccessor extends ReadAccessor {
 
     /**
      * Method which reads needed statistics for current split
-     * @throws Exception if retrieving the stats failed
+     * @throws Exception when unable to retrieve statistics
      */
     public void retrieveStats() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
index ed8ad28..175a6e1 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
 import org.apache.hawq.pxf.api.StatsAccessor;
 
 import java.io.ByteArrayInputStream;
@@ -164,7 +165,7 @@ public class Utilities {
      * @param inputData input data which has protocol information
      * @return fragment metadata
      * @throws IllegalArgumentException if fragment metadata information wasn't found in
input data
-     * @throws Exception if unable to parse the fragment
+     * @throws Exception when error occurred during metadata parsing
      */
     public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception
{
         byte[] serializedLocation = inputData.getFragmentMetadata();
@@ -197,8 +198,8 @@ public class Utilities {
 
     /**
      * Based on accessor information determines whether to use AggBridge
-     * 
-     * @param inputData input data
+     *
+     * @param inputData input protocol data
      * @return true if AggBridge is applicable for current context
      */
     public static boolean useAggBridge(InputData inputData) {
@@ -234,4 +235,19 @@ public class Utilities {
             return false;
         }
     }
+
+    /**
+     * Determines whether use vectorization
+     * @param inputData input protocol data
+     * @return true if vectorization is applicable in a current context
+     */
+    public static boolean useVectorization(InputData inputData) {
+        boolean isVectorizedResolver = false;
+        try {
+            isVectorizedResolver = ArrayUtils.contains(Class.forName(inputData.getResolver()).getInterfaces(),
ReadVectorizedResolver.class);
+        } catch (ClassNotFoundException e) {
+            LOG.error("Unable to load resolver class: " + e.getMessage());
+        }
+        return isVectorizedResolver;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
index 01c09bf..5caca6d 100644
--- a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/utilities/UtilitiesTest.java
@@ -30,10 +30,14 @@ import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
+import java.util.List;
 
 import org.apache.hawq.pxf.api.Metadata;
+import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.OneRow;
 import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
 import org.apache.hawq.pxf.api.StatsAccessor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
@@ -89,6 +93,22 @@ public class UtilitiesTest {
         }
     }
 
+    class ReadVectorizedResolverImpl implements ReadVectorizedResolver {
+
+        @Override
+        public List<List<OneField>> getFieldsForBatch(OneRow batch) {
+            return null;
+        }
+    }
+
+    class ReadResolverImpl implements ReadResolver {
+
+        @Override
+        public List<OneField> getFields(OneRow row) throws Exception {
+            return null;
+        }
+    }
+
     @Test
     public void byteArrayToOctalStringNull() throws Exception {
         StringBuilder sb = null;
@@ -222,4 +242,13 @@ public class UtilitiesTest {
         when(metaData.getNumAttrsProjected()).thenReturn(1);
         assertFalse(Utilities.useStats(accessor, metaData));
     }
+
+    @Test
+    public void useVectorization() {
+        InputData metaData = mock(InputData.class);
+        when(metaData.getResolver()).thenReturn("org.apache.hawq.pxf.api.utilities.UtilitiesTest$ReadVectorizedResolverImpl");
+        assertTrue(Utilities.useVectorization(metaData));
+        when(metaData.getResolver()).thenReturn("org.apache.hawq.pxf.api.utilities.UtilitiesTest$ReadResolverImpl");
+        assertFalse(Utilities.useVectorization(metaData));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 9cf8f27..6e193c2 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -286,10 +286,11 @@ public class HiveDataFragmenter extends Fragmenter {
         InputFormat<?, ?> fformat = makeInputFormat(
                 tablePartition.storageDesc.getInputFormat(), jobConf);
         String profile = null;
-        if (inputData.getProfile() != null) {
+        String userProfile = inputData.getProfile();
+        if (userProfile != null) {
             // evaluate optimal profile based on file format if profile was explicitly specified
in url
             // if user passed accessor+fragmenter+resolver - use them
-            profile = ProfileFactory.get(fformat, hasComplexTypes);
+            profile = ProfileFactory.get(fformat, hasComplexTypes, userProfile);
         }
         String fragmenterForProfile = null;
         if (profile != null) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java
new file mode 100644
index 0000000..3de1500
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedAccessor.java
@@ -0,0 +1,106 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.*;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.Reader.Options;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Accessor class which reads data in batches.
+ * One batch is 1024 rows of all projected columns
+ *
+ */
+public class HiveORCVectorizedAccessor extends HiveORCAccessor {
+
+    private RecordReader vrr;
+    private int batchIndex;
+    private VectorizedRowBatch batch;
+
+    public HiveORCVectorizedAccessor(InputData input) throws Exception {
+        super(input);
+    }
+
+    @Override
+    public boolean openForRead() throws Exception {
+        Options options = new Options();
+        addColumns(options);
+        addFragments(options);
+        orcReader = HiveUtilities.getOrcReader(inputData);
+        vrr = orcReader.rowsOptions(options);
+        return vrr.hasNext();
+    }
+
+    /**
+     * File might have multiple splits, so this method restricts
+     * reader to one split.
+     * @param options reader options to modify
+     */
+    private void addFragments(Options options) {
+        FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData);
+        options.range(fileSplit.getStart(), fileSplit.getLength());
+    }
+
+    /**
+     * Reads next batch for current fragment.
+     * @return next batch in OneRow format, key is a batch number, data is a batch
+     */
+    @Override
+    public OneRow readNextObject() throws IOException {
+        if (vrr.hasNext()) {
+            batch = vrr.nextBatch(batch);
+            batchIndex++;
+            return new OneRow(new LongWritable(batchIndex), batch);
+        } else {
+            //All batches are exhausted
+            return null;
+        }
+    }
+
+    /**
+     * This method updated reader options to include projected columns only.
+     * @param options reader options to modify
+     * @throws Exception
+     */
+    private void addColumns(Options options) throws Exception {
+        boolean[] includeColumns = new boolean[inputData.getColumns() + 1];
+        for (ColumnDescriptor col : inputData.getTupleDescription()) {
+            if (col.isProjected()) {
+                includeColumns[col.columnIndex() + 1] = true;
+            }
+        }
+        options.include(includeColumns);
+    }
+
+    @Override
+    public void closeForRead() throws Exception {
+        if (vrr != null) {
+            vrr.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java
new file mode 100644
index 0000000..5d03d7a
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCVectorizedResolver.java
@@ -0,0 +1,367 @@
+package org.apache.hawq.pxf.plugins.hive;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.hawq.pxf.api.io.DataType.BIGINT;
+import static org.apache.hawq.pxf.api.io.DataType.BOOLEAN;
+import static org.apache.hawq.pxf.api.io.DataType.BPCHAR;
+import static org.apache.hawq.pxf.api.io.DataType.BYTEA;
+import static org.apache.hawq.pxf.api.io.DataType.DATE;
+import static org.apache.hawq.pxf.api.io.DataType.FLOAT8;
+import static org.apache.hawq.pxf.api.io.DataType.INTEGER;
+import static org.apache.hawq.pxf.api.io.DataType.NUMERIC;
+import static org.apache.hawq.pxf.api.io.DataType.REAL;
+import static org.apache.hawq.pxf.api.io.DataType.SMALLINT;
+import static org.apache.hawq.pxf.api.io.DataType.TEXT;
+import static org.apache.hawq.pxf.api.io.DataType.TIMESTAMP;
+import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.sql.Timestamp;
+import java.sql.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
+import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hadoop.hive.serde2.*;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.ql.exec.vector.*;
+
+/**
+ * Class which implements resolving a batch of records at once
+ */
+public class HiveORCVectorizedResolver extends HiveResolver implements ReadVectorizedResolver
{
+
+    private static final Log LOG = LogFactory.getLog(HiveORCVectorizedResolver.class);
+
+    private List<List<OneField>> resolvedBatch;
+    private StructObjectInspector soi;
+
+    public HiveORCVectorizedResolver(InputData input) throws Exception {
+        super(input);
+        try {
+            soi = (StructObjectInspector) HiveUtilities.getOrcReader(input).getObjectInspector();
+        } catch (Exception e) {
+            LOG.error("Unable to create an object inspector.");
+            throw e;
+        }
+    }
+
+    @Override
+    public List<List<OneField>> getFieldsForBatch(OneRow batch) {
+
+        Writable writableObject = null;
+        Object fieldValue = null;
+        VectorizedRowBatch vectorizedBatch = (VectorizedRowBatch) batch.getData();
+
+        /* Allocate empty result set */
+        int columnsNumber = inputData.getColumns();
+        resolvedBatch = new ArrayList<List<OneField>>(vectorizedBatch.size);
+
+        /* Create empty template row */
+        ArrayList<OneField> templateRow = new ArrayList<OneField>(columnsNumber);
+        ArrayList<OneField> currentRow = null;
+        for (int j = 0; j < inputData.getColumns(); j++) {
+            templateRow.add(null);
+        }
+        /* Replicate template row*/
+        for (int i = 0; i < vectorizedBatch.size; i++) {
+            currentRow = new ArrayList<OneField>(templateRow);
+            resolvedBatch.add(currentRow);
+        }
+
+        /* process all columns*/
+        List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs();
+        for (int columnIndex = 0; columnIndex < vectorizedBatch.numCols; columnIndex++)
{
+            ObjectInspector oi = allStructFieldRefs.get(columnIndex).getFieldObjectInspector();
+            if (oi.getCategory() == Category.PRIMITIVE) {
+                PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
+                resolvePrimitiveColumn(columnIndex, oi, vectorizedBatch);
+            } else {
+                throw new UnsupportedTypeException("Unable to resolve column index:" + columnIndex
+                        + ". Only primitive types are supported.");
+            }
+        }
+
+        return resolvedBatch;
+    }
+
+    /**
+     * Resolves a column of a primitive type out of given batch
+     *
+     * @param columnIndex     index of the column
+     * @param oi              object inspector
+     * @param vectorizedBatch input batch or records
+     */
+    private void resolvePrimitiveColumn(int columnIndex, ObjectInspector oi, VectorizedRowBatch
vectorizedBatch) {
+
+        OneField field = null;
+        Writable writableObject = null;
+        PrimitiveCategory poc = ((PrimitiveObjectInspector) oi).getPrimitiveCategory();
+        populatePrimitiveColumn(poc, oi, vectorizedBatch, columnIndex);
+    }
+
+    private void addValueToColumn(int columnIndex, int rowIndex, OneField field) {
+        List<OneField> row = this.resolvedBatch.get(rowIndex);
+        row.set(columnIndex, field);
+    }
+
+    private void populatePrimitiveColumn(PrimitiveCategory primitiveCategory, ObjectInspector
oi, VectorizedRowBatch vectorizedBatch, int columnIndex) {
+        ColumnVector columnVector = vectorizedBatch.cols[columnIndex];
+        Object fieldValue = null;
+        DataType fieldType = null;
+
+        switch (primitiveCategory) {
+            case BOOLEAN: {
+                fieldType = BOOLEAN;
+                LongColumnVector lcv = (LongColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (lcv != null) {
+                        int rowId = lcv.isRepeating ? 0 : rowIndex;
+                        if (!lcv.isNull[rowId]) {
+                            fieldValue = lcv.vector[rowId] == 1;
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case SHORT: {
+                fieldType = SMALLINT;
+                LongColumnVector lcv = (LongColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (lcv != null) {
+                        int rowId = lcv.isRepeating ? 0 : rowIndex;
+                        if (!lcv.isNull[rowId]) {
+                            fieldValue = (short) lcv.vector[rowId];
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case INT: {
+                fieldType = INTEGER;
+                LongColumnVector lcv = (LongColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (lcv != null) {
+                        int rowId = lcv.isRepeating ? 0 : rowIndex;
+                        if (!lcv.isNull[rowId]) {
+                            fieldValue = (int) lcv.vector[rowId];
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case LONG: {
+                fieldType = BIGINT;
+                LongColumnVector lcv = (LongColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (lcv != null) {
+                        int rowId = lcv.isRepeating ? 0 : rowIndex;
+                        if (!lcv.isNull[rowId]) {
+                            fieldValue = lcv.vector[rowId];
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case FLOAT: {
+                fieldType = REAL;
+                DoubleColumnVector dcv = (DoubleColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (dcv != null) {
+                        int rowId = dcv.isRepeating ? 0 : rowIndex;
+                        if (!dcv.isNull[rowId]) {
+                            fieldValue = (float) dcv.vector[rowId];
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case DOUBLE: {
+                fieldType = FLOAT8;
+                DoubleColumnVector dcv = (DoubleColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (dcv != null) {
+                        int rowId = dcv.isRepeating ? 0 : rowIndex;
+                        if (!dcv.isNull[rowId]) {
+                            fieldValue = dcv.vector[rowId];
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case DECIMAL: {
+                fieldType = NUMERIC;
+                DecimalColumnVector dcv = (DecimalColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (dcv != null) {
+                        int rowId = dcv.isRepeating ? 0 : rowIndex;
+                        if (!dcv.isNull[rowId]) {
+                            fieldValue = dcv.vector[rowId];
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case VARCHAR: {
+                fieldType = VARCHAR;
+                BytesColumnVector bcv = (BytesColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (columnVector != null) {
+                        int rowId = bcv.isRepeating ? 0 : rowIndex;
+                        if (!bcv.isNull[rowId]) {
+                            Text textValue = new Text();
+                            textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]);
+                            fieldValue = textValue;
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case CHAR: {
+                fieldType = BPCHAR;
+                BytesColumnVector bcv = (BytesColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (columnVector != null) {
+                        int rowId = bcv.isRepeating ? 0 : rowIndex;
+                        if (!bcv.isNull[rowId]) {
+                            Text textValue = new Text();
+                            textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]);
+                            fieldValue = textValue;
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case STRING: {
+                fieldType = TEXT;
+                BytesColumnVector bcv = (BytesColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (columnVector != null) {
+                        int rowId = bcv.isRepeating ? 0 : rowIndex;
+                        if (!bcv.isNull[rowId]) {
+                            Text textValue = new Text();
+                            textValue.set(bcv.vector[rowIndex], bcv.start[rowIndex], bcv.length[rowIndex]);
+                            fieldValue = textValue;
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case BINARY: {
+                fieldType = BYTEA;
+                BytesColumnVector bcv = (BytesColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (columnVector != null) {
+                        int rowId = bcv.isRepeating ? 0 : rowIndex;
+                        if (!bcv.isNull[rowId]) {
+                            fieldValue = new byte[bcv.length[rowId]];
+                            System.arraycopy(bcv.vector[rowId], bcv.start[rowId], fieldValue,
0, bcv.length[rowId]);
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case DATE: {
+                fieldType = DATE;
+                LongColumnVector lcv = (LongColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (lcv != null) {
+                        int rowId = lcv.isRepeating ? 0 : rowIndex;
+                        if (!lcv.isNull[rowId]) {
+                            fieldValue = new Date(DateWritable.daysToMillis((int) lcv.vector[rowIndex]));
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            case BYTE: {
+                fieldType = SMALLINT;
+                LongColumnVector lcv = (LongColumnVector) columnVector;
+                for (int rowIndex = 0; rowIndex < vectorizedBatch.size; rowIndex++) {
+                    fieldValue = null;
+                    if (lcv != null) {
+                        int rowId = lcv.isRepeating ? 0 : rowIndex;
+                        if (!lcv.isNull[rowId]) {
+                            fieldValue = (short) lcv.vector[rowIndex];
+                        }
+                    }
+                    addValueToColumn(columnIndex, rowIndex, new OneField(fieldType.getOID(),
fieldValue));
+                }
+                break;
+            }
+            default: {
+                throw new UnsupportedTypeException(oi.getTypeName()
+                        + " conversion is not supported by "
+                        + getClass().getSimpleName());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
index f36f074..7294a02 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/ProfileFactory.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hawq.pxf.api.Metadata;
 
 /**
  * Factory class which returns optimal profile for given input format
@@ -35,16 +34,20 @@ public class ProfileFactory {
     private static final String HIVE_RC_PROFILE = "HiveRC";
     private static final String HIVE_ORC_PROFILE = "HiveORC";
     private static final String HIVE_PROFILE = "Hive";
+    private static final String HIVE_ORC_VECTORIZED_PROFILE = "HiveVectorizedORC";
 
     /**
      * The method which returns optimal profile
      *
      * @param inputFormat input format of table/partition
      * @param hasComplexTypes whether record has complex types, see @EnumHiveToHawqType
+     * @param userProfileName profile name provided by user
      * @return name of optimal profile
      */
-    public static String get(InputFormat inputFormat, boolean hasComplexTypes) {
+    public static String get(InputFormat inputFormat, boolean hasComplexTypes, String userProfileName)
{
         String profileName = null;
+        if (HIVE_ORC_VECTORIZED_PROFILE.equals(userProfileName))
+            return userProfileName;
         if (inputFormat instanceof TextInputFormat && !hasComplexTypes) {
             profileName = HIVE_TEXT_PROFILE;
         } else if (inputFormat instanceof RCFileInputFormat) {
@@ -58,4 +61,12 @@ public class ProfileFactory {
         return profileName;
     }
 
+    /**
+     * @see ProfileFactory#get(InputFormat, boolean, String)
+     */
+    public static String get(InputFormat inputFormat, boolean hasComplexTypes) {
+        String profileName = get(inputFormat, hasComplexTypes, null);
+        return profileName;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
index 1c199d3..f9dbd72 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
@@ -137,6 +137,19 @@ public class BridgeOutputBuilder {
         return outputList;
     }
 
+    public LinkedList<Writable> makeVectorizedOutput(List<List<OneField>>
recordsBatch) throws BadRecordException {
+        outputList.clear();
+        if (recordsBatch != null) {
+            for (List<OneField> record : recordsBatch) {
+                if (inputData.outputFormat() == OutputFormat.GPDBWritable) {
+                    makeGPDBWritableOutput();
+                }
+                fillOutputRecord(record);
+            }
+        }
+        return outputList;
+    }
+
     /**
      * Returns whether or not this is a partial line.
      *

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
index edd0a99..dd095dd 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
@@ -166,7 +166,7 @@ public class ReadBridge implements Bridge {
      * analyzing the exception type, and when we discover that the actual
      * problem was a data problem, we return the errorOutput GPDBWritable.
      */
-    private boolean isDataException(IOException ex) {
+    protected boolean isDataException(IOException ex) {
         return (ex instanceof EOFException
                 || ex instanceof CharacterCodingException
                 || ex instanceof CharConversionException

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java
new file mode 100644
index 0000000..ca222f1
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadVectorizedBridge.java
@@ -0,0 +1,102 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadVectorizedResolver;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+
+public class ReadVectorizedBridge extends ReadBridge {
+
+    private static final Log LOG = LogFactory.getLog(ReadVectorizedBridge.class);
+
+    public ReadVectorizedBridge(ProtocolData protData) throws Exception {
+        super(protData);
+    }
+
+    @Override
+    public Writable getNext() throws Exception {
+        Writable output = null;
+        OneRow batch = null;
+
+        if (!outputQueue.isEmpty()) {
+            return outputQueue.pop();
+        }
+
+        try {
+            while (outputQueue.isEmpty()) {
+                batch = fileAccessor.readNextObject();
+                if (batch == null) {
+                    output = outputBuilder.getPartialLine();
+                    if (output != null) {
+                        LOG.warn("A partial record in the end of the fragment");
+                    }
+                    // if there is a partial line, return it now, otherwise it
+                    // will return null
+                    return output;
+                }
+
+                // we checked before that outputQueue is empty, so we can
+                // override it.
+                List<List<OneField>> resolvedBatch = ((ReadVectorizedResolver)
fieldsResolver).getFieldsForBatch(batch);
+                outputQueue = outputBuilder.makeVectorizedOutput(resolvedBatch);
+                if (!outputQueue.isEmpty()) {
+                    output = outputQueue.pop();
+                    break;
+                }
+            }
+        } catch (IOException ex) {
+            if (!isDataException(ex)) {
+                throw ex;
+            }
+            output = outputBuilder.getErrorOutput(ex);
+        } catch (BadRecordException ex) {
+            String row_info = "null";
+            if (batch != null) {
+                row_info = batch.toString();
+            }
+            if (ex.getCause() != null) {
+                LOG.debug("BadRecordException " + ex.getCause().toString()
+                        + ": " + row_info);
+            } else {
+                LOG.debug(ex.toString() + ": " + row_info);
+            }
+            output = outputBuilder.getErrorOutput(ex);
+        } catch (Exception ex) {
+            throw ex;
+        }
+
+        return output;
+    }
+
+    @Override
+    public void endIteration() throws Exception {
+        fileAccessor.closeForRead();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 4294e09..027663b 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -44,6 +44,7 @@ import org.apache.hawq.pxf.service.AggBridge;
 import org.apache.hawq.pxf.service.Bridge;
 import org.apache.hawq.pxf.service.ReadBridge;
 import org.apache.hawq.pxf.service.ReadSamplingBridge;
+import org.apache.hawq.pxf.service.ReadVectorizedBridge;
 import org.apache.hawq.pxf.service.io.Writable;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
 import org.apache.hawq.pxf.service.utilities.SecuredHDFS;
@@ -101,6 +102,8 @@ public class BridgeResource extends RestResource {
             bridge = new ReadSamplingBridge(protData);
         } else if (Utilities.useAggBridge(protData)) {
             bridge = new AggBridge(protData);
+        } else if (Utilities.useVectorization(protData)) {
+            bridge = new ReadVectorizedBridge(protData);
         } else {
             bridge = new ReadBridge(protData);
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/29a16083/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index f076ead..a8666eb 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -101,6 +101,17 @@ under the License.
             <outputFormat>org.apache.hawq.pxf.service.io.GPDBWritable</outputFormat>
         </plugins>
     </profile>
+        <profile>
+        <name>HiveVectorizedORC</name>
+        <description></description>
+        <plugins>
+            <fragmenter>org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter</fragmenter>
+            <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCVectorizedAccessor</accessor>
+            <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCVectorizedResolver</resolver>
+            <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+            <outputFormat>org.apache.hawq.pxf.service.io.GPDBWritable</outputFormat>
+        </plugins>
+    </profile>
     <profile>
         <name>HdfsTextSimple</name>
         <description>This profile is suitable for using when reading delimited single
line records from plain text files



Mime
View raw message