hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From odiache...@apache.org
Subject [1/4] incubator-hawq git commit: HAWQ-1228. Initial commit.
Date Tue, 24 Jan 2017 02:32:39 GMT
Repository: incubator-hawq
Updated Branches:
  refs/heads/HAWQ-1228 [created] 607184c36


HAWQ-1228. Initial commit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/0574e75f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/0574e75f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/0574e75f

Branch: refs/heads/HAWQ-1228
Commit: 0574e75fa972f6ccddd1f55a98972223b3860759
Parents: 25c87ec
Author: Oleksandr Diachenko <odiachenko@pivotal.io>
Authored: Wed Dec 28 14:03:50 2016 -0800
Committer: Oleksandr Diachenko <odiachenko@pivotal.io>
Committed: Fri Jan 6 11:55:19 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/hawq/pxf/api/Metadata.java  |  12 ++
 .../org/apache/hawq/pxf/api/OutputFormat.java   |   2 +-
 .../hawq/pxf/plugins/hive/HiveAccessor.java     |  14 +--
 .../plugins/hive/HiveColumnarSerdeResolver.java |  62 +++++-----
 .../pxf/plugins/hive/HiveDataFragmenter.java    |   1 -
 .../plugins/hive/HiveInputFormatFragmenter.java |  40 -------
 .../pxf/plugins/hive/HiveLineBreakAccessor.java |  10 +-
 .../pxf/plugins/hive/HiveMetadataFetcher.java   |  41 ++++++-
 .../hawq/pxf/plugins/hive/HiveORCAccessor.java  |  11 +-
 .../pxf/plugins/hive/HiveORCSerdeResolver.java  |  17 ++-
 .../pxf/plugins/hive/HiveRCFileAccessor.java    |  10 +-
 .../hawq/pxf/plugins/hive/HiveResolver.java     |  38 +++----
 .../plugins/hive/HiveStringPassResolver.java    |  42 +++++--
 .../hawq/pxf/plugins/hive/HiveUserData.java     |  71 ++++++++++++
 .../plugins/hive/utilities/HiveUtilities.java   | 113 +++++++++++--------
 .../pxf/plugins/hive/HiveORCAccessorTest.java   |   9 +-
 .../apache/hawq/pxf/service/ProfileFactory.java |  20 ++--
 .../src/main/resources/pxf-profiles-default.xml |   4 +
 src/backend/access/external/fileam.c            |   4 +
 src/backend/catalog/external/externalmd.c       |  57 ++++++++--
 src/bin/gpfusion/gpbridgeapi.c                  |  43 ++++++-
 src/include/access/extprotocol.h                |   1 +
 src/include/access/fileam.h                     |   1 +
 src/include/catalog/external/itemmd.h           |   3 +
 src/include/catalog/pg_exttable.h               |  10 +-
 25 files changed, 428 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
index 9e1c137..7e3b92e 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Metadata.java
@@ -22,6 +22,7 @@ package org.apache.hawq.pxf.api;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hawq.pxf.api.utilities.EnumHawqType;
 import org.apache.commons.lang.StringUtils;
@@ -124,6 +125,17 @@ public class Metadata {
      */
     private List<Metadata.Field> fields;
 
+
+    private Set<OutputFormat> outputFormats;
+
+    public Set<OutputFormat> getOutputFormats() {
+        return outputFormats;
+    }
+
+    public void setFormats(Set<OutputFormat> outputFormats) {
+        this.outputFormats = outputFormats;
+    }
+
     /**
      * Constructs an item's Metadata.
      *

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
index 230f9ff..82a747f 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/OutputFormat.java
@@ -23,4 +23,4 @@ package org.apache.hawq.pxf.api;
 /**
  * PXF supported output formats: {@link #TEXT} and {@link #BINARY}
  */
-public enum OutputFormat {TEXT, BINARY}
+public enum OutputFormat {TEXT, BINARY, UNKNOWN}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
index ef9f76e..ea3accb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveAccessor.java
@@ -28,6 +28,7 @@ import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.InputFormat;
@@ -42,10 +43,6 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.apache.hawq.pxf.api.io.DataType.*;
-import static org.apache.hawq.pxf.api.io.DataType.BPCHAR;
-import static org.apache.hawq.pxf.api.io.DataType.BYTEA;
-
 /**
  * Accessor for Hive tables. The accessor will open and read a split belonging
  * to a Hive table. Opening a split means creating the corresponding InputFormat
@@ -138,12 +135,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
      */
     private InputFormat<?, ?> createInputFormat(InputData input)
             throws Exception {
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
-        initPartitionFields(toks[3]);
-        filterInFragmenter = new Boolean(toks[4]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
         return HiveDataFragmenter.makeInputFormat(
-                toks[0]/* inputFormat name */, jobConf);
+                hiveUserData.getInputFormatName()/* inputFormat name */, jobConf);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
index 362ac0d..2bf39ff 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveColumnarSerdeResolver.java
@@ -22,12 +22,15 @@ package org.apache.hawq.pxf.plugins.hive;
 import org.apache.hawq.pxf.api.BadRecordException;
 import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +43,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -57,11 +59,11 @@ import static org.apache.hawq.pxf.api.io.DataType.VARCHAR;
  */
 public class HiveColumnarSerdeResolver extends HiveResolver {
     private static final Log LOG = LogFactory.getLog(HiveColumnarSerdeResolver.class);
-    private ColumnarSerDeBase deserializer;
+    //private ColumnarSerDeBase deserializer;
     private boolean firstColumn;
     private StringBuilder builder;
     private StringBuilder parts;
-    private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+    private HiveUtilities.PXF_HIVE_SERDES serdeType;
 
     public HiveColumnarSerdeResolver(InputData input) throws Exception {
         super(input);
@@ -70,24 +72,22 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
     /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
     @Override
     void parseUserData(InputData input) throws Exception {
-        String[] toks = HiveInputFormatFragmenter.parseToks(input);
-        String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
-        if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE.name())) {
-            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE;
-        } else if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name())) {
-            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE;
-        }
-        else {
-            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
-        }
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE, HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+        String serdeClassName = hiveUserData.getSerdeClassName();
+
+        serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(serdeClassName);
         parts = new StringBuilder();
-        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+        partitionKeys = hiveUserData.getPartitionKeys();
         parseDelimiterChar(input);
     }
 
     @Override
     void initPartitionFields() {
-        initPartitionFields(parts);
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            initTextPartitionFields(parts);
+        } else {
+            super.initPartitionFields();
+        }
     }
 
     /**
@@ -97,15 +97,19 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
      */
     @Override
     public List<OneField> getFields(OneRow onerow) throws Exception {
-        firstColumn = true;
-        builder = new StringBuilder();
-        Object tuple = deserializer.deserialize((Writable) onerow.getData());
-        ObjectInspector oi = deserializer.getObjectInspector();
-
-        traverseTuple(tuple, oi);
-        /* We follow Hive convention. Partition fields are always added at the end of the record */
-        builder.append(parts);
-        return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            firstColumn = true;
+            builder = new StringBuilder();
+            Object tuple = deserializer.deserialize((Writable) onerow.getData());
+            ObjectInspector oi = deserializer.getObjectInspector();
+    
+            traverseTuple(tuple, oi);
+            /* We follow Hive convention. Partition fields are always added at the end of the record */
+            builder.append(parts);
+            return Collections.singletonList(new OneField(VARCHAR.getOID(), builder.toString()));
+        } else {
+            return super.getFields(onerow);
+        }
     }
 
     /*
@@ -138,9 +142,10 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
         serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
         serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
 
-        if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
+        //TODO: Move this logic to utilities
+        if (serdeType == HiveUtilities.PXF_HIVE_SERDES.COLUMNAR_SERDE) {
             deserializer = new ColumnarSerDe();
-        } else if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
+        } else if (serdeType == HiveUtilities.PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE) {
             deserializer = new LazyBinaryColumnarSerDe();
         } else {
             throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
@@ -233,4 +238,9 @@ public class HiveColumnarSerdeResolver extends HiveResolver {
         }
         firstColumn = false;
     }
+
+    @Override
+    void parseDelimiterChar(InputData input) {
+        delimiter = 1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 2d2b53e..97f278d 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -78,7 +78,6 @@ public class HiveDataFragmenter extends Fragmenter {
     private static final Log LOG = LogFactory.getLog(HiveDataFragmenter.class);
     private static final short ALL_PARTS = -1;
 
-    public static final String HIVE_UD_DELIM = "!HUDD!";
     public static final String HIVE_1_PART_DELIM = "!H1PD!";
     public static final String HIVE_PARTITIONS_DELIM = "!HPAD!";
     public static final String HIVE_NO_PART_TBL = "!HNPT!";

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index 051a246..4449d8f 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -55,10 +55,6 @@ import java.util.List;
  */
 public class HiveInputFormatFragmenter extends HiveDataFragmenter {
     private static final Log LOG = LogFactory.getLog(HiveInputFormatFragmenter.class);
-    private static final int EXPECTED_NUM_OF_TOKS = 3;
-    public static final int TOK_SERDE = 0;
-    public static final int TOK_KEYS = 1;
-    public static final int TOK_FILTER_DONE = 2;
 
     /** Defines the Hive input formats currently supported in pxf */
     public enum PXF_HIVE_INPUT_FORMATS {
@@ -67,14 +63,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
         ORC_FILE_INPUT_FORMAT
     }
 
-    /** Defines the Hive serializers (serde classes) currently supported in pxf */
-    public enum PXF_HIVE_SERDES {
-        COLUMNAR_SERDE,
-        LAZY_BINARY_COLUMNAR_SERDE,
-        LAZY_SIMPLE_SERDE,
-        ORC_SERDE
-    }
-
     /**
      * Constructs a HiveInputFormatFragmenter.
      *
@@ -84,34 +72,6 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
         super(inputData, HiveInputFormatFragmenter.class);
     }
 
-    /**
-     * Extracts the user data:
-     * serde, partition keys and whether filter was included in fragmenter
-     *
-     * @param input input data from client
-     * @param supportedSerdes supported serde names
-     * @return parsed tokens
-     * @throws UserDataException if user data contains unsupported serde
-     *                           or wrong number of tokens
-     */
-    static public String[] parseToks(InputData input, String... supportedSerdes)
-            throws UserDataException {
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HIVE_UD_DELIM);
-        if (supportedSerdes.length > 0
-                && !Arrays.asList(supportedSerdes).contains(toks[TOK_SERDE])) {
-            throw new UserDataException(toks[TOK_SERDE]
-                    + " serializer isn't supported by " + input.getAccessor());
-        }
-
-        if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
-            throw new UserDataException("HiveInputFormatFragmenter expected "
-                    + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
-        }
-
-        return toks;
-    }
-
     /*
      * Checks that hive fields and partitions match the HAWQ schema. Throws an
      * exception if: - the number of fields (+ partitions) do not match the HAWQ

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
index ed4f908..66680bb 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveLineBreakAccessor.java
@@ -21,12 +21,12 @@ package org.apache.hawq.pxf.plugins.hive;
 
 
 import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.hadoop.mapred.*;
 
 import java.io.IOException;
 
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Specialization of HiveAccessor for a Hive table stored as Text files.
@@ -43,9 +43,9 @@ public class HiveLineBreakAccessor extends HiveAccessor {
     public HiveLineBreakAccessor(InputData input) throws Exception {
         super(input, new TextInputFormat());
         ((TextInputFormat) inputFormat).configure(jobConf);
-        String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name());
-        initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
-        filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
index 91f91e7..3d04bc1 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveMetadataFetcher.java
@@ -21,19 +21,27 @@ package org.apache.hawq.pxf.plugins.hive;
 
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hawq.pxf.api.Metadata;
 import org.apache.hawq.pxf.api.MetadataFetcher;
+import org.apache.hawq.pxf.api.OutputFormat;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.ProfilesConf;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.service.ProfileFactory;
 
 /**
  * Class for connecting to Hive's MetaStore and getting schema of Hive tables.
@@ -42,12 +50,14 @@ public class HiveMetadataFetcher extends MetadataFetcher {
 
     private static final Log LOG = LogFactory.getLog(HiveMetadataFetcher.class);
     private HiveMetaStoreClient client;
+    private JobConf jobConf;
 
     public HiveMetadataFetcher(InputData md) {
         super(md);
 
         // init hive metastore client connection.
         client = HiveUtilities.initHiveClient();
+        jobConf = new JobConf(new Configuration());
     }
 
     /**
@@ -84,6 +94,21 @@ public class HiveMetadataFetcher extends MetadataFetcher {
                 Table tbl = HiveUtilities.getHiveTable(client, tblDesc);
                 getSchema(tbl, metadata);
                 metadataList.add(metadata);
+                List<Partition> tablePartitions = client.listPartitionsByFilter(tblDesc.getPath(), tblDesc.getName(), "", (short) -1);
+                Set<OutputFormat> formats = new HashSet<OutputFormat>();
+                //If table has partitions - find out all formats
+                for (Partition tablePartition : tablePartitions) {
+                    String inputFormat = tablePartition.getSd().getInputFormat();
+                    OutputFormat outputFormat = getOutputFormat(inputFormat);
+                    formats.add(outputFormat);
+                }
+                //If table has no partitions - get single format of table
+                if (tablePartitions.size() == 0 ) {
+                    String inputFormat = tbl.getSd().getInputFormat();
+                    OutputFormat outputFormat = getOutputFormat(inputFormat);
+                    formats.add(outputFormat);
+                }
+                metadata.setFormats(formats);
             } catch (UnsupportedTypeException | UnsupportedOperationException e) {
                 if(ignoreErrors) {
                     LOG.warn("Metadata fetch for " + tblDesc.toString() + " failed. " + e.getMessage());
@@ -135,4 +160,18 @@ public class HiveMetadataFetcher extends MetadataFetcher {
             throw new UnsupportedTypeException(errorMsg);
         }
     }
+
+    private OutputFormat getOutputFormat(String inputFormat) {
+        OutputFormat outputFormat = OutputFormat.UNKNOWN;
+        try {
+            InputFormat<?, ?> fformat = HiveDataFragmenter.makeInputFormat(inputFormat, jobConf);
+            String profile = ProfileFactory.get(fformat);
+            String outputFormatString = ProfilesConf.getProfilePluginsMap(profile).get("X-GP-OUTPUTFORMAT");
+            outputFormat = OutputFormat.valueOf(outputFormatString);
+        } catch (Exception e) {
+            LOG.warn("Unable to get output format for input format: " + inputFormat);
+        }
+        return outputFormat;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
index dc195f4..be29eec 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java
@@ -30,6 +30,9 @@ import org.apache.hawq.pxf.api.BasicFilter;
 import org.apache.hawq.pxf.api.LogicalFilter;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+//import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+//import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 import org.apache.commons.lang.StringUtils;
 
 import java.sql.Date;
@@ -37,7 +40,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Specialization of HiveAccessor for a Hive table that stores only ORC files.
@@ -61,9 +64,9 @@ public class HiveORCAccessor extends HiveAccessor {
      */
     public HiveORCAccessor(InputData input) throws Exception {
         super(input, new OrcInputFormat());
-        String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.ORC_SERDE.name());
-        initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
-        filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
index 7673713..381c407 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCSerdeResolver.java
@@ -34,6 +34,7 @@ import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 import java.util.*;
 
@@ -44,7 +45,7 @@ import java.util.*;
 public class HiveORCSerdeResolver extends HiveResolver {
     private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
     private OrcSerde deserializer;
-    private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
+    private HiveUtilities.PXF_HIVE_SERDES serdeType;
 
     public HiveORCSerdeResolver(InputData input) throws Exception {
         super(input);
@@ -53,14 +54,9 @@ public class HiveORCSerdeResolver extends HiveResolver {
     /* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
     @Override
     void parseUserData(InputData input) throws Exception {
-        String[] toks = HiveInputFormatFragmenter.parseToks(input);
-        String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
-        if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) {
-            serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE;
-        } else {
-            throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
-        }
-        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE);
+        serdeType = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName());
+        partitionKeys = hiveUserData.getPartitionKeys();
         collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
                 : input.getUserProperty("COLLECTION_DELIM");
         mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
@@ -72,6 +68,7 @@ public class HiveORCSerdeResolver extends HiveResolver {
      * OneField item contains two fields: an integer representing the VARCHAR type and a Java
      * Object representing the field value.
      */
+    //TODO: It's the same as in parent class
     @Override
     public List<OneField> getFields(OneRow onerow) throws Exception {
 
@@ -117,7 +114,7 @@ public class HiveORCSerdeResolver extends HiveResolver {
         serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
         serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
 
-        if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) {
+        if (serdeType == HiveUtilities.PXF_HIVE_SERDES.ORC_SERDE) {
             deserializer = new OrcSerde();
         } else {
             throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
index 2686851..7132d7b 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveRCFileAccessor.java
@@ -21,7 +21,7 @@ package org.apache.hawq.pxf.plugins.hive;
 
 
 import org.apache.hawq.pxf.api.utilities.InputData;
-
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
 import org.apache.hadoop.mapred.FileSplit;
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
 
-import static org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Specialization of HiveAccessor for a Hive table that stores only RC files.
@@ -47,9 +47,9 @@ public class HiveRCFileAccessor extends HiveAccessor {
      */
     public HiveRCFileAccessor(InputData input) throws Exception {
         super(input, new RCFileInputFormat());
-        String[] toks = HiveInputFormatFragmenter.parseToks(input, PXF_HIVE_SERDES.COLUMNAR_SERDE.name(), PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name());
-        initPartitionFields(toks[HiveInputFormatFragmenter.TOK_KEYS]);
-        filterInFragmenter = new Boolean(toks[HiveInputFormatFragmenter.TOK_FILTER_DONE]);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.COLUMNAR_SERDE, PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE);
+        initPartitionFields(hiveUserData.getPartitionKeys());
+        filterInFragmenter = hiveUserData.isFilterInFragmenter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
index 3837f78..55d7205 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveResolver.java
@@ -27,6 +27,7 @@ import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.utilities.Plugin;
 import org.apache.hawq.pxf.api.utilities.Utilities;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
 import org.apache.commons.lang.CharUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,10 +75,13 @@ public class HiveResolver extends Plugin implements ReadResolver {
     protected static final String COLLECTION_DELIM = ",";
     protected String collectionDelim;
     protected String mapkeyDelim;
-    private SerDe deserializer;
+    //private SerDe deserializer;
+    protected SerDe deserializer;
     private List<OneField> partitionFields;
-    private String serdeName;
-    private String propsString;
+    //private String serdeClassName;
+    protected String serdeClassName;
+    //private String propsString;
+    protected String propsString;
     String partitionKeys;
     protected char delimiter;
     String nullChar = "\\N";
@@ -135,17 +139,11 @@ public class HiveResolver extends Plugin implements ReadResolver {
     void parseUserData(InputData input) throws Exception {
         final int EXPECTED_NUM_OF_TOKS = 5;
 
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
 
-        if (toks.length != EXPECTED_NUM_OF_TOKS) {
-            throw new UserDataException("HiveResolver expected "
-                    + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
-        }
-
-        serdeName = toks[1];
-        propsString = toks[2];
-        partitionKeys = toks[3];
+        serdeClassName = hiveUserData.getSerdeClassName();
+        propsString = hiveUserData.getPropertiesString();
+        partitionKeys = hiveUserData.getPartitionKeys();
 
         collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
                 : input.getUserProperty("COLLECTION_DELIM");
@@ -160,14 +158,14 @@ public class HiveResolver extends Plugin implements ReadResolver {
     void initSerde(InputData inputData) throws Exception {
         Properties serdeProperties;
 
-        Class<?> c = Class.forName(serdeName, true, JavaUtils.getClassLoader());
+        Class<?> c = Class.forName(serdeClassName, true, JavaUtils.getClassLoader());
         deserializer = (SerDe) c.newInstance();
         serdeProperties = new Properties();
-        ByteArrayInputStream inStream = new ByteArrayInputStream(
-                propsString.getBytes());
-        serdeProperties.load(inStream);
-        deserializer.initialize(new JobConf(conf, HiveResolver.class),
-                serdeProperties);
+        if (propsString != null ) {
+            ByteArrayInputStream inStream = new ByteArrayInputStream(propsString.getBytes());
+            serdeProperties.load(inStream);
+        }
+        deserializer.initialize(new JobConf(conf, HiveResolver.class), serdeProperties);
     }
 
     /*
@@ -271,7 +269,7 @@ public class HiveResolver extends Plugin implements ReadResolver {
      * The partition fields are initialized one time based on userData provided
      * by the fragmenter.
      */
-    void initPartitionFields(StringBuilder parts) {
+    void initTextPartitionFields(StringBuilder parts) {
         if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
index fdc5f69..c7cfb36 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveStringPassResolver.java
@@ -22,7 +22,10 @@ package org.apache.hawq.pxf.plugins.hive;
 
 import org.apache.hawq.pxf.api.OneField;
 import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
 import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
 
 import java.util.Collections;
 import java.util.List;
@@ -42,21 +45,34 @@ public class HiveStringPassResolver extends HiveResolver {
 
     @Override
     void parseUserData(InputData input) throws Exception {
-        String userData = new String(input.getFragmentUserData());
-        String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
+        HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input);
         parseDelimiterChar(input);
         parts = new StringBuilder();
-        partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
+        partitionKeys = hiveUserData.getPartitionKeys();
+        serdeClassName = hiveUserData.getSerdeClassName();
+
+        /* Needed only for BINARY format*/
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.BINARY) {
+            propsString = hiveUserData.getPropertiesString();
+        }
     }
 
     @Override
-    void initSerde(InputData input) {
-        /* nothing to do here */
+    void initSerde(InputData input) throws Exception {
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            /* nothing to do here */
+        } else {
+            super.initSerde(input);
+        }
     }
 
     @Override
     void initPartitionFields() {
-        initPartitionFields(parts);
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            initTextPartitionFields(parts);
+        } else {
+            super.initPartitionFields();
+        }
     }
 
     /**
@@ -66,9 +82,17 @@ public class HiveStringPassResolver extends HiveResolver {
      */
     @Override
     public List<OneField> getFields(OneRow onerow) throws Exception {
-        String line = (onerow.getData()).toString();
+        if (((ProtocolData) inputData).outputFormat() == OutputFormat.TEXT) {
+            String line = (onerow.getData()).toString();
+
+            /* We follow Hive convention. Partition fields are always added at the end of the record */
+            return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+        } else {
+            return super.getFields(onerow);
+        }
+    }
 
-        /* We follow Hive convention. Partition fields are always added at the end of the record */
-        return Collections.singletonList(new OneField(VARCHAR.getOID(), line + parts));
+    void parseDelimiterChar(InputData input) {
+        this.delimiter = 1;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
new file mode 100644
index 0000000..710700a
--- /dev/null
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveUserData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hawq.pxf.plugins.hive;
+
+public class HiveUserData {
+
+    public static final String HIVE_UD_DELIM = "!HUDD!";
+
+    public HiveUserData(String inputFormatName, String serdeClassName,
+            String propertiesString, String partitionKeys,
+            boolean filterInFragmenter) {
+        this.inputFormatName = inputFormatName;
+        this.serdeClassName = serdeClassName;
+        this.propertiesString = propertiesString;
+        this.partitionKeys = partitionKeys;
+        this.filterInFragmenter = filterInFragmenter;
+    }
+
+    public String getInputFormatName() {
+        return inputFormatName;
+    }
+
+    public String getSerdeClassName() {
+        return serdeClassName;
+    }
+
+    public String getPropertiesString() {
+        return propertiesString;
+    }
+
+    public String getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    public boolean isFilterInFragmenter() {
+        return filterInFragmenter;
+    }
+
+    private String inputFormatName;
+    private String serdeClassName;
+    private String propertiesString;
+    private String partitionKeys;
+    private boolean filterInFragmenter;
+
+    @Override
+    public String toString() {
+        return inputFormatName + HiveUserData.HIVE_UD_DELIM 
+                + serdeClassName + HiveUserData.HIVE_UD_DELIM
+                + propertiesString + HiveUserData.HIVE_UD_DELIM
+                + partitionKeys + HiveUserData.HIVE_UD_DELIM 
+                + filterInFragmenter;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
index ffd66b8..b78c379 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java
@@ -39,13 +39,16 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hawq.pxf.api.Fragmenter;
 import org.apache.hawq.pxf.api.Metadata;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
+import org.apache.hawq.pxf.api.UserDataException;
 import org.apache.hawq.pxf.api.utilities.EnumHawqType;
+import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.api.io.DataType;
 import org.apache.hawq.pxf.plugins.hive.HiveDataFragmenter;
 import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter;
 import org.apache.hawq.pxf.plugins.hive.HiveTablePartition;
 import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS;
-import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDES;
+import org.apache.hawq.pxf.plugins.hive.HiveUserData;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 
 /**
  * Class containing helper functions connecting
@@ -53,6 +56,38 @@ import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_SERDE
  */
 public class HiveUtilities {
 
+    /** Defines the Hive serializers (serde classes) currently supported in pxf */
+    public enum PXF_HIVE_SERDES {
+        COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"),
+        LAZY_BINARY_COLUMNAR_SERDE("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"),
+        LAZY_SIMPLE_SERDE("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+        ORC_SERDE("org.apache.hadoop.hive.ql.io.orc.OrcSerde");
+        
+        private String serdeClassName;
+        
+        PXF_HIVE_SERDES(String serdeClassName) {
+            this.serdeClassName = serdeClassName;
+        }
+
+        public static PXF_HIVE_SERDES getPxfHiveSerde(String serdeClassName, PXF_HIVE_SERDES... allowedSerdes) {
+            for (PXF_HIVE_SERDES s : values()) {
+                if (s.getSerdeClassName().equals(serdeClassName)) {
+
+                    if (allowedSerdes.length > 0
+                            && !Arrays.asList(allowedSerdes).contains(s)) {
+                        throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeClassName);
+                    }
+                    return s;
+                }
+            }
+            throw new UnsupportedTypeException("Unable to find serde for class name: "+ serdeClassName);
+        }
+
+        public String getSerdeClassName() {
+            return serdeClassName;
+        }
+    }
+
     private static final Log LOG = LogFactory.getLog(HiveUtilities.class);
     private static final String WILDCARD = "*";
 
@@ -64,10 +99,7 @@ public class HiveUtilities {
     static final String STR_RC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
     static final String STR_TEXT_FILE_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat";
     static final String STR_ORC_FILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
-    static final String STR_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
-    static final String STR_LAZY_BINARY_COLUMNAR_SERDE = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
-    static final String STR_LAZY_SIMPLE_SERDE = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
-    static final String STR_ORC_SERDE = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+    private static final int EXPECTED_NUM_OF_TOKS = 5;
 
     /**
      * Initializes the HiveMetaStoreClient
@@ -376,31 +408,6 @@ public class HiveUtilities {
         }
     }
 
-    /*
-     * Validates that partition serde corresponds to PXF supported serdes and
-     * transforms the class name to an enumeration for writing it to the
-     * resolvers on other PXF instances.
-     */
-    private static String assertSerde(String className, HiveTablePartition partData)
-            throws Exception {
-        switch (className) {
-            case STR_COLUMNAR_SERDE:
-                return PXF_HIVE_SERDES.COLUMNAR_SERDE.name();
-            case STR_LAZY_BINARY_COLUMNAR_SERDE:
-                return PXF_HIVE_SERDES.LAZY_BINARY_COLUMNAR_SERDE.name();
-            case STR_LAZY_SIMPLE_SERDE:
-                return PXF_HIVE_SERDES.LAZY_SIMPLE_SERDE.name();
-            case STR_ORC_SERDE:
-                return PXF_HIVE_SERDES.ORC_SERDE.name();
-            default:
-                throw new UnsupportedTypeException(
-                        "HiveInputFormatFragmenter does not yet support  "
-                                + className + " for " + partData
-                                + ". Supported serializers are: "
-                                + Arrays.toString(PXF_HIVE_SERDES.values()));
-        }
-    }
-
 
     /* Turns the partition keys into a string */
     public static String serializePartitionKeys(HiveTablePartition partData) throws Exception {
@@ -432,7 +439,7 @@ public class HiveUtilities {
     @SuppressWarnings("unchecked")
     public static byte[] makeUserData(String fragmenterClassName, HiveTablePartition partData, boolean filterInFragmenter) throws Exception {
 
-        String userData = null;
+        HiveUserData hiveUserData = null;
 
         if (fragmenterClassName == null) {
             throw new IllegalArgumentException("No fragmenter provided.");
@@ -440,24 +447,36 @@ public class HiveUtilities {
 
         Class fragmenterClass = Class.forName(fragmenterClassName);
 
+        String inputFormatName = partData.storageDesc.getInputFormat();
+        String serdeClassName = partData.storageDesc.getSerdeInfo().getSerializationLib();
+        String propertiesString = serializeProperties(partData.properties);
+        String partitionKeys = serializePartitionKeys(partData);
+
         if (HiveInputFormatFragmenter.class.isAssignableFrom(fragmenterClass)) {
-            String inputFormatName = partData.storageDesc.getInputFormat();
-            String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
-            String partitionKeys = serializePartitionKeys(partData);
             assertFileType(inputFormatName, partData);
-            userData = assertSerde(serdeName, partData) + HiveDataFragmenter.HIVE_UD_DELIM
-                    + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter;
-        } else if (HiveDataFragmenter.class.isAssignableFrom(fragmenterClass)){
-            String inputFormatName = partData.storageDesc.getInputFormat();
-            String serdeName = partData.storageDesc.getSerdeInfo().getSerializationLib();
-            String propertiesString = serializeProperties(partData.properties);
-            String partitionKeys = serializePartitionKeys(partData);
-            userData = inputFormatName + HiveDataFragmenter.HIVE_UD_DELIM + serdeName
-                    + HiveDataFragmenter.HIVE_UD_DELIM + propertiesString + HiveDataFragmenter.HIVE_UD_DELIM
-                    + partitionKeys + HiveDataFragmenter.HIVE_UD_DELIM + filterInFragmenter;
-        } else {
-            throw new IllegalArgumentException("HiveUtilities#makeUserData is not implemented for " + fragmenterClassName);
         }
-        return userData.getBytes();
+
+        hiveUserData = new HiveUserData(inputFormatName, serdeClassName, propertiesString, partitionKeys, filterInFragmenter);
+
+        return hiveUserData.toString().getBytes();
+    }
+
+    public static HiveUserData parseHiveUserData(InputData input, PXF_HIVE_SERDES... supportedSerdes) throws UserDataException{
+        String userData = new String(input.getFragmentUserData());
+        String[] toks = userData.split(HiveUserData.HIVE_UD_DELIM);
+
+        if (toks.length != (EXPECTED_NUM_OF_TOKS)) {
+            throw new UserDataException("HiveInputFormatFragmenter expected "
+                    + EXPECTED_NUM_OF_TOKS + " tokens, but got " + toks.length);
+        }
+
+        HiveUserData hiveUserData = new HiveUserData(toks[0], toks[1], toks[2], toks[3], Boolean.valueOf(toks[4]));
+
+            if (supportedSerdes.length > 0) {
+                /* Make sure this serde is supported */
+                PXF_HIVE_SERDES pxfHiveSerde = PXF_HIVE_SERDES.getPxfHiveSerde(hiveUserData.getSerdeClassName(), supportedSerdes);
+            }
+
+        return hiveUserData;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
index 7bbe811..1d90d01 100644
--- a/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
+++ b/pxf/pxf-hive/src/test/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessorTest.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.mapred.*;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities;
+import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,7 +45,7 @@ import static org.mockito.Mockito.when;
 
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({HiveORCAccessor.class, HiveInputFormatFragmenter.class, HdfsUtilities.class, HiveDataFragmenter.class})
+@PrepareForTest({HiveORCAccessor.class, HiveUtilities.class, HdfsUtilities.class, HiveDataFragmenter.class})
 @SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf",
         "org.apache.hadoop.hive.metastore.api.MetaException",
         "org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities"}) // Prevents static inits
@@ -61,8 +63,9 @@ public class HiveORCAccessorTest {
         jobConf = new JobConf();
         PowerMockito.whenNew(JobConf.class).withAnyArguments().thenReturn(jobConf);
 
-        PowerMockito.mockStatic(HiveInputFormatFragmenter.class);
-        PowerMockito.when(HiveInputFormatFragmenter.parseToks(any(InputData.class), any(String[].class))).thenReturn(new String[]{"", HiveDataFragmenter.HIVE_NO_PART_TBL, "true"});
+        PowerMockito.mockStatic(HiveUtilities.class);
+        PowerMockito.when(HiveUtilities.parseHiveUserData(any(InputData.class), any(PXF_HIVE_SERDES[].class))).thenReturn(new HiveUserData("", "", null, HiveDataFragmenter.HIVE_NO_PART_TBL, true));
+
         PowerMockito.mockStatic(HdfsUtilities.class);
 
         PowerMockito.mockStatic(HiveDataFragmenter.class);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
index fc5ed0f..d053760 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ProfileFactory.java
@@ -19,26 +19,30 @@ package org.apache.hawq.pxf.service;
  * under the License.
  */
 
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 
 public class ProfileFactory {
 
     private static final String HIVE_TEXT_PROFILE = "HiveText";
     private static final String HIVE_RC_PROFILE = "HiveRC";
     private static final String HIVE_ORC_PROFILE = "HiveORC";
+    private static final String HIVE_PROFILE = "Hive";
 
-    public static String get(InputFormat inputFormat) throws Exception {
+    public static String get(InputFormat inputFormat) {
         String profileName = null;
-        // TODO: Uncomment in process of HAWQ-1228 implementation
-        //if (inputFormat instanceof TextInputFormat) {
-        //    profileName = HIVE_TEXT_PROFILE;
-        //} else if (inputFormat instanceof RCFileInputFormat) {
-        //    profileName = HIVE_RC_PROFILE;
-        /*} else */if (inputFormat instanceof OrcInputFormat) {
+        if (inputFormat instanceof TextInputFormat) {
+            profileName = HIVE_TEXT_PROFILE;
+        } else if (inputFormat instanceof RCFileInputFormat) {
+            profileName = HIVE_RC_PROFILE;
+        } else if (inputFormat instanceof OrcInputFormat) {
             profileName = HIVE_ORC_PROFILE;
+        } else {
+            //Default case
+            profileName = HIVE_PROFILE;
         }
-
         return profileName;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
index 1edb6d5..a3f21f6 100644
--- a/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
+++ b/pxf/pxf-service/src/main/resources/pxf-profiles-default.xml
@@ -49,6 +49,7 @@ under the License.
             <accessor>org.apache.hawq.pxf.plugins.hive.HiveAccessor</accessor>
             <resolver>org.apache.hawq.pxf.plugins.hive.HiveResolver</resolver>
             <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+            <outputFormat>BINARY</outputFormat>
         </plugins>
     </profile>
     <profile>
@@ -63,6 +64,7 @@ under the License.
             <accessor>org.apache.hawq.pxf.plugins.hive.HiveRCFileAccessor</accessor>
             <resolver>org.apache.hawq.pxf.plugins.hive.HiveColumnarSerdeResolver</resolver>
             <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+            <outputFormat>TEXT</outputFormat>
         </plugins>
     </profile>
     <profile>
@@ -76,6 +78,7 @@ under the License.
             <accessor>org.apache.hawq.pxf.plugins.hive.HiveLineBreakAccessor</accessor>
             <resolver>org.apache.hawq.pxf.plugins.hive.HiveStringPassResolver</resolver>
             <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+            <outputFormat>TEXT</outputFormat>
         </plugins>
     </profile>
     <profile>
@@ -89,6 +92,7 @@ under the License.
             <accessor>org.apache.hawq.pxf.plugins.hive.HiveORCAccessor</accessor>
             <resolver>org.apache.hawq.pxf.plugins.hive.HiveORCSerdeResolver</resolver>
             <metadata>org.apache.hawq.pxf.plugins.hive.HiveMetadataFetcher</metadata>
+            <outputFormat>BINARY</outputFormat>
         </plugins>
     </profile>
     <profile>

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 70a115a..20f662d 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -461,8 +461,12 @@ external_stopscan(FileScanDesc scan)
 ExternalSelectDesc
 external_getnext_init(PlanState *state) {
 	ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData));
+
 	if (state != NULL)
+	{
 		desc->projInfo = state->ps_ProjInfo;
+		desc->fmttype = &((ExternalScan *) state->plan)->fmtType;
+	}
 	return desc;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/backend/catalog/external/externalmd.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c
index 0e39d25..ccdbdd6 100644
--- a/src/backend/catalog/external/externalmd.c
+++ b/src/backend/catalog/external/externalmd.c
@@ -57,6 +57,8 @@ static void LoadDistributionPolicy(Oid relid, PxfItem *pxfItem);
 static void LoadExtTable(Oid relid, PxfItem *pxfItem);
 static void LoadColumns(Oid relid, List *columns);
 static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod);
+static Datum GetFormatTypeForProfile(const List *outputFormats);
+static Datum GetFormatOptionsForProfile(const List *outputFormats);
 
 const int maxNumTypeModifiers = 2;
 /*
@@ -128,7 +130,22 @@ static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile)
 	pxfItem->profile = profile;
 	pxfItem->path = pstrdup(json_object_get_string(itemPath));
 	pxfItem->name = pstrdup(json_object_get_string(itemName));
-	
+
+	/* parse output formats */
+	struct json_object *jsonOutputFormats = json_object_object_get(pxfMD, "outputFormats");
+
+	if (NULL != jsonOutputFormats)
+	{
+		const int numOutputFormats = json_object_array_length(jsonOutputFormats);
+		for (int i = 0; i < numOutputFormats; i++)
+		{
+			PxfField *pxfField = palloc0(sizeof(PxfField));
+			struct json_object *jsonOutputFormat = json_object_array_get_idx(jsonOutputFormats, i);
+			char *outupFormat = pstrdup(json_object_get_string(jsonOutputFormat));
+			pxfItem->outputFormats = lappend(pxfItem->outputFormats, outupFormat);
+		}
+	}
+
 	elog(DEBUG1, "Parsed item %s, namespace %s", itemName, itemPath);
 		
 	/* parse columns */
@@ -464,17 +481,10 @@ static void LoadExtTable(Oid relid, PxfItem *pxfItem)
 	Assert(NULL != astate);
 	Datum location = makeArrayResult(astate, CurrentMemoryContext);
 
-	/* format options - should be "formatter 'pxfwritable_import'" */
-	StringInfoData formatStr;
-	initStringInfo(&formatStr);
-	appendStringInfo(&formatStr, "formatter 'pxfwritable_import'");
-	Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data));
-	pfree(formatStr.data);
-
 	values[Anum_pg_exttable_reloid - 1] = ObjectIdGetDatum(relid);
 	values[Anum_pg_exttable_location - 1] = location;
-	values[Anum_pg_exttable_fmttype - 1] = CharGetDatum('b' /* binary */);
-	values[Anum_pg_exttable_fmtopts - 1] = format_opts;
+	values[Anum_pg_exttable_fmttype - 1] = GetFormatTypeForProfile(pxfItem->outputFormats);
+	values[Anum_pg_exttable_fmtopts - 1] = GetFormatOptionsForProfile(pxfItem->outputFormats);
 	nulls[Anum_pg_exttable_command - 1] = true;
 	nulls[Anum_pg_exttable_rejectlimit - 1] = true;
 	nulls[Anum_pg_exttable_rejectlimittype - 1] = true;
@@ -631,3 +641,30 @@ static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nT
 	return VARHDRSZ + result;
 }
 
+static Datum GetFormatTypeForProfile(const List *outputFormats)
+{
+
+	if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0)
+	{
+		return CharGetDatum(TextFormatType);
+	} else
+	{
+		return CharGetDatum(CustomFormatType);
+	}
+}
+
+static Datum GetFormatOptionsForProfile(const List *outputFormats)
+{
+	StringInfoData formatStr;
+	initStringInfo(&formatStr);
+	if (list_length(outputFormats) == 1 && strcmp(lfirst(list_head(outputFormats)),"TEXT") == 0)
+	{
+		appendStringInfo(&formatStr, "delimiter '\x01' null '\N' escape '\'");
+	} else {
+		appendStringInfo(&formatStr, "formatter 'pxfwritable_import'");
+	}
+	Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data));
+	pfree(formatStr.data);
+	return format_opts;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index b524df8..c5c217c 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -49,7 +49,7 @@ gphadoop_context*	create_context(PG_FUNCTION_ARGS);
 void	add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS);
 void	append_churl_header_if_exists(gphadoop_context* context,
 									  const char* key, const char* value);
-void    set_current_fragment_headers(gphadoop_context* context);
+void    set_current_fragment_headers(gphadoop_context* context, char *fmttype);
 void	gpbridge_import_start(PG_FUNCTION_ARGS);
 void	gpbridge_export_start(PG_FUNCTION_ARGS);
 PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel);
@@ -62,6 +62,7 @@ void 	build_uri_for_write(gphadoop_context* context, PxfServer* rest_server);
 size_t	fill_buffer(gphadoop_context* context, char* start, size_t size);
 void	add_delegation_token(PxfInputData *inputData);
 void	free_token_resources(PxfInputData *inputData);
+static void assign_optimal_supported_profile(char *profile, char *fmttype, char **supportedProfile, char **supportedFormat);
 
 /* Custom protocol entry point for read
  */
@@ -207,7 +208,7 @@ void append_churl_header_if_exists(gphadoop_context* context, const char* key, c
  * 2. X-GP-FRAGMENT-USER-DATA header is changed to the current fragment's user data.
  * If the fragment doesn't have user data, the header will be removed.
  */
-void set_current_fragment_headers(gphadoop_context* context)
+void set_current_fragment_headers(gphadoop_context* context, char *fmttype)
 {
 	FragmentData* frag_data = (FragmentData*)lfirst(context->current_fragment);
 	elog(DEBUG2, "pxf: set_current_fragment_source_name: source_name %s, index %s, has user data: %s ",
@@ -229,11 +230,21 @@ void set_current_fragment_headers(gphadoop_context* context)
 	/* if current fragment has optimal profile set it*/
 	if (frag_data->profile)
 	{
-		churl_headers_override(context->churl_headers, "X-GP-PROFILE", frag_data->profile);
+		char *supportedProfile = NULL;
+		char *supportedFormat = NULL;
+		assign_optimal_supported_profile(frag_data->profile, fmttype, &supportedProfile, &supportedFormat);
+
+		churl_headers_override(context->churl_headers, "X-GP-PROFILE", supportedProfile);
+		churl_headers_override(context->churl_headers, "X-GP-FORMAT", supportedFormat);
 	} else if (context->gphd_uri->profile)
 	{
 		/* if current fragment doesn't have any optimal profile, set to use profile from url */
-		churl_headers_override(context->churl_headers, "X-GP-PROFILE", context->gphd_uri->profile);
+		char *supportedProfile = NULL;
+		char *supportedFormat = NULL;
+		assign_optimal_supported_profile(context->gphd_uri->profile, fmttype, &supportedProfile, &supportedFormat);
+
+		churl_headers_override(context->churl_headers, "X-GP-PROFILE", supportedProfile);
+		churl_headers_override(context->churl_headers, "X-GP-FORMAT", supportedFormat);
 	}
 	/* if there is no profile passed in url, we expect to have accessor+fragmenter+resolver so no action needed by this point */
 
@@ -249,7 +260,8 @@ void gpbridge_import_start(PG_FUNCTION_ARGS)
 	context->churl_headers = churl_headers_init();
 	add_querydata_to_http_header(context, fcinfo);
 
-	set_current_fragment_headers(context);
+	char *fmttype = EXTPROTOCOL_GET_FMTTYPE(fcinfo);
+	set_current_fragment_headers(context, fmttype);
 
 	context->churl_handle = churl_init_download(context->uri.data,
 												context->churl_headers);
@@ -399,6 +411,7 @@ PxfServer* get_pxf_server(GPHDUri* gphd_uri, const Relation rel)
 size_t gpbridge_read(PG_FUNCTION_ARGS)
 {
 	char* databuf;
+	char* fmttype;
 	size_t datalen;
 	size_t n = 0;
 	gphadoop_context* context;
@@ -406,6 +419,7 @@ size_t gpbridge_read(PG_FUNCTION_ARGS)
 	context = EXTPROTOCOL_GET_USER_CTX(fcinfo);
 	databuf = EXTPROTOCOL_GET_DATABUF(fcinfo);
 	datalen = EXTPROTOCOL_GET_DATALEN(fcinfo);
+	fmttype = EXTPROTOCOL_GET_FMTTYPE(fcinfo);
 
 	while ((n = fill_buffer(context, databuf, datalen)) == 0)
 	{
@@ -419,7 +433,7 @@ size_t gpbridge_read(PG_FUNCTION_ARGS)
 		if (context->current_fragment == NULL)
 			return 0;
 
-		set_current_fragment_headers(context);
+		set_current_fragment_headers(context, fmttype);
 		churl_download_restart(context->churl_handle, context->uri.data, context->churl_headers);
 
 		/* read some bytes to make sure the connection is established */
@@ -547,3 +561,20 @@ void free_token_resources(PxfInputData *inputData)
 
 	pfree(inputData->token);
 }
+
+static void	assign_optimal_supported_profile(char *profile, char *fmttype, char **supportedProfile, char **supportedFormat)
+{
+	if (fmttype_is_text(*fmttype) && ((strcmp(profile, "HiveText") == 0) || (strcmp(profile, "HiveRc") == 0)))
+	{
+		*supportedFormat = "TEXT";
+		*supportedProfile = profile;
+	} else if (fmttype_is_custom(*fmttype))
+	{
+		*supportedFormat = "GPDBWritable";
+		*supportedProfile = profile;
+	} else
+	{
+		*supportedFormat = "GPDBWritable";
+		*supportedProfile = "Hive";
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index 4b69bb7..4c37ac6 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -66,6 +66,7 @@ typedef ExtProtocolData *ExtProtocol;
 #define EXTPROTOCOL_GET_USER_CTX(fcinfo)   (((ExtProtocolData*) fcinfo->context)->prot_user_ctx)
 #define EXTPROTOCOL_GET_SELECTDESC(fcinfo)   (((ExtProtocolData*) fcinfo->context)->desc)
 #define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo)
+#define EXTPROTOCOL_GET_FMTTYPE(fcinfo)   (((ExtProtocolData*) fcinfo->context)->desc->fmttype)
 #define EXTPROTOCOL_IS_LAST_CALL(fcinfo)   (((ExtProtocolData*) fcinfo->context)->prot_last_call)
 
 #define EXTPROTOCOL_SET_LAST_CALL(fcinfo)  (((ExtProtocolData*) fcinfo->context)->prot_last_call = true)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 1e926d5..df8c284 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -70,6 +70,7 @@ typedef ExternalInsertDescData *ExternalInsertDesc;
 typedef struct ExternalSelectDescData
 {
 	ProjectionInfo *projInfo;
+	char	*fmttype;
 } ExternalSelectDescData;
 
 typedef enum DataLineStatus

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/catalog/external/itemmd.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/external/itemmd.h b/src/include/catalog/external/itemmd.h
index e6dad63..5717a53 100644
--- a/src/include/catalog/external/itemmd.h
+++ b/src/include/catalog/external/itemmd.h
@@ -67,6 +67,9 @@ typedef struct PxfItem
 	
 	/* fields */
 	List *fields;
+
+	/* output formats*/
+	List *outputFormats;
 } PxfItem;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0574e75f/src/include/catalog/pg_exttable.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_exttable.h b/src/include/catalog/pg_exttable.h
index 3a0fadd..ae2fb00 100644
--- a/src/include/catalog/pg_exttable.h
+++ b/src/include/catalog/pg_exttable.h
@@ -164,8 +164,12 @@ GetExtTableEntry(Oid relid);
 extern void
 RemoveExtTableEntry(Oid relid);
 
-#define fmttype_is_custom(c) (c == 'b')
-#define fmttype_is_text(c)   (c == 't')
-#define fmttype_is_csv(c)    (c == 'c')
+#define CustomFormatType 'b'
+#define TextFormatType 't'
+#define CsvFormatType 'c'
+
+#define fmttype_is_custom(c) (c == CustomFormatType)
+#define fmttype_is_text(c)   (c == TextFormatType)
+#define fmttype_is_csv(c)    (c == CsvFormatType)
 
 #endif /* PG_EXTTABLE_H */


Mime
View raw message