hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From odiache...@apache.org
Subject [2/3] incubator-hawq git commit: HAWQ-703. Serialize HCatalog Complex Types to plain text (as Hive profile).
Date Thu, 28 Apr 2016 23:42:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
new file mode 100644
index 0000000..025797b
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/MetadataResponseFormatter.java
@@ -0,0 +1,94 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+import org.apache.hawq.pxf.api.Metadata;
+
+/**
+ * Utility class for converting {@link Metadata} into a JSON format.
+ */
+public class MetadataResponseFormatter {
+
+    private static final Log LOG = LogFactory.getLog(MetadataResponseFormatter.class);
+
+    /**
+     * Converts list of {@link Metadata} to JSON String format.
+     *
+     * @param metadataList list of metadata objects to convert
+     * @return JSON formatted response
+     * @throws IOException if converting the data to JSON fails
+     */
+    public static MetadataResponse formatResponse(List<Metadata> metadataList, String path) throws IOException {
+        /* print the fragment list to log when in debug level */
+        if (LOG.isDebugEnabled()) {
+            MetadataResponseFormatter.printMetadata(metadataList, path);
+        }
+
+        return new MetadataResponse(metadataList);
+    }
+
+    /**
+     * Converts metadata list to a readable string.
+     * Intended for debugging purposes only.
+     */
+    private static void printMetadata(List<Metadata> metadataList, String path) {
+        LOG.debug("Metadata List for path " + path + ": ");
+
+        if (null == metadataList || metadataList.isEmpty()) {
+            LOG.debug("No metadata");
+            return;
+        }
+
+        for(Metadata metadata: metadataList) {
+            StringBuilder result = new StringBuilder();
+
+            if (metadata == null) {
+                result.append("None");
+                LOG.debug(result);
+                continue;
+            }
+
+            result.append("Metadata for item \"").append(metadata.getItem()).append("\": ");
+
+            if ((metadata.getFields() == null) || metadata.getFields().isEmpty()) {
+                result.append("None");
+            } else {
+                int i = 0;
+                for (Metadata.Field field : metadata.getFields()) {
+                    result.append("Field #").append(++i).append(": [")
+                            .append("Name: ").append(field.getName())
+                            .append(", Type: ").append(field.getType().getTypeName())
+                            .append(", Source type: ").append(field.getSourceType()).append("] ");
+                }
+            }
+            LOG.debug(result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
new file mode 100644
index 0000000..01a95ab
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadBridge.java
@@ -0,0 +1,179 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hawq.pxf.api.BadRecordException;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.nio.charset.CharacterCodingException;
+import java.util.LinkedList;
+import java.util.zip.ZipException;
+
+/**
+ * ReadBridge class creates appropriate accessor and resolver. It will then
+ * create the correct output conversion class (e.g. Text or GPDBWritable) and
+ * get records from accessor, let resolver deserialize them and reserialize them
+ * using the output conversion class. <br>
+ * The class handles BadRecordException and other exception type and marks the
+ * record as invalid for HAWQ.
+ */
+public class ReadBridge implements Bridge {
+    ReadAccessor fileAccessor = null;
+    ReadResolver fieldsResolver = null;
+    BridgeOutputBuilder outputBuilder = null;
+    LinkedList<Writable> outputQueue = null;
+
+    private static final Log LOG = LogFactory.getLog(ReadBridge.class);
+
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing accessor and resolver names
+     * @throws Exception if accessor or resolver can't be instantiated
+     */
+    public ReadBridge(ProtocolData protData) throws Exception {
+        outputBuilder = new BridgeOutputBuilder(protData);
+        outputQueue = new LinkedList<Writable>();
+        fileAccessor = getFileAccessor(protData);
+        fieldsResolver = getFieldsResolver(protData);
+    }
+
+    /**
+     * Accesses the underlying HDFS file.
+     */
+    @Override
+    public boolean beginIteration() throws Exception {
+        return fileAccessor.openForRead();
+    }
+
+    /**
+     * Fetches next object from file and turn it into a record that the HAWQ
+     * backend can process.
+     */
+    @Override
+    public Writable getNext() throws Exception {
+        Writable output = null;
+        OneRow onerow = null;
+
+        if (!outputQueue.isEmpty()) {
+            return outputQueue.pop();
+        }
+
+        try {
+            while (outputQueue.isEmpty()) {
+                onerow = fileAccessor.readNextObject();
+                if (onerow == null) {
+                    fileAccessor.closeForRead();
+                    output = outputBuilder.getPartialLine();
+                    if (output != null) {
+                        LOG.warn("A partial record in the end of the fragment");
+                    }
+                    // if there is a partial line, return it now, otherwise it
+                    // will return null
+                    return output;
+                }
+
+                // we checked before that outputQueue is empty, so we can
+                // override it.
+                outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+                if (!outputQueue.isEmpty()) {
+                    output = outputQueue.pop();
+                    break;
+                }
+            }
+        } catch (IOException ex) {
+            if (!isDataException(ex)) {
+                fileAccessor.closeForRead();
+                throw ex;
+            }
+            output = outputBuilder.getErrorOutput(ex);
+        } catch (BadRecordException ex) {
+            String row_info = "null";
+            if (onerow != null) {
+                row_info = onerow.toString();
+            }
+            if (ex.getCause() != null) {
+                LOG.debug("BadRecordException " + ex.getCause().toString()
+                        + ": " + row_info);
+            } else {
+                LOG.debug(ex.toString() + ": " + row_info);
+            }
+            output = outputBuilder.getErrorOutput(ex);
+        } catch (Exception ex) {
+            fileAccessor.closeForRead();
+            throw ex;
+        }
+
+        return output;
+    }
+
+    public static ReadAccessor getFileAccessor(InputData inputData)
+            throws Exception {
+        return (ReadAccessor) Utilities.createAnyInstance(InputData.class,
+                inputData.getAccessor(), inputData);
+    }
+
+    public static ReadResolver getFieldsResolver(InputData inputData)
+            throws Exception {
+        return (ReadResolver) Utilities.createAnyInstance(InputData.class,
+                inputData.getResolver(), inputData);
+    }
+
+    /*
+     * There are many exceptions that inherit IOException. Some of them like
+     * EOFException are generated due to a data problem, and not because of an
+     * IO/connection problem as the father IOException might lead us to believe.
+     * For example, an EOFException will be thrown while fetching a record from
+     * a sequence file, if there is a formatting problem in the record. Fetching
+     * record from the sequence-file is the responsibility of the accessor so
+     * the exception will be thrown from the accessor. We identify this cases by
+     * analyzing the exception type, and when we discover that the actual
+     * problem was a data problem, we return the errorOutput GPDBWritable.
+     */
+    private boolean isDataException(IOException ex) {
+        return (ex instanceof EOFException
+                || ex instanceof CharacterCodingException
+                || ex instanceof CharConversionException
+                || ex instanceof UTFDataFormatException || ex instanceof ZipException);
+    }
+
+    @Override
+    public boolean setNext(DataInputStream inputStream) {
+        throw new UnsupportedOperationException("setNext is not implemented");
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        boolean result = ((Plugin) fileAccessor).isThreadSafe()
+                && ((Plugin) fieldsResolver).isThreadSafe();
+        LOG.debug("Bridge is " + (result ? "" : "not ") + "thread safe");
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
new file mode 100644
index 0000000..d5ae66a
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/ReadSamplingBridge.java
@@ -0,0 +1,131 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInputStream;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+/**
+ * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output
+ * records, based on a ratio sample. The sample to pass or discard a record is
+ * done after all of the processing is completed (
+ * {@code accessor -> resolver -> output builder}) to make sure there are no
+ * chunks of data instead of single records. <br>
+ * The goal is to get as uniform as possible sampling. This is achieved by
+ * creating a bit map matching the precision of the sampleRatio, so that for a
+ * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be
+ * set. This map is matched against each read record, discarding ones with a 0
+ * bit and continuing until a 1 bit record is read.
+ */
+public class ReadSamplingBridge implements Bridge {
+
+    ReadBridge bridge;
+
+    float sampleRatio;
+    BitSet sampleBitSet;
+    int bitSetSize;
+    int sampleSize;
+    int curIndex;
+
+    private static final Log LOG = LogFactory.getLog(ReadSamplingBridge.class);
+
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing sampling ratio
+     * @throws Exception if the sampling ratio is wrong
+     */
+    public ReadSamplingBridge(ProtocolData protData) throws Exception {
+        bridge = new ReadBridge(protData);
+
+        this.sampleRatio = protData.getStatsSampleRatio();
+        if (sampleRatio < 0.0001 || sampleRatio > 1.0) {
+            throw new IllegalArgumentException(
+                    "sampling ratio must be a value between 0.0001 and 1.0. "
+                            + "(value = " + sampleRatio + ")");
+        }
+
+        calculateBitSetSize();
+
+        this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize,
+                sampleSize);
+        this.curIndex = 0;
+    }
+
+    private void calculateBitSetSize() {
+
+        sampleSize = (int) (sampleRatio * 10000);
+        bitSetSize = 10000;
+
+        while ((bitSetSize > 100) && (sampleSize % 10 == 0)) {
+            bitSetSize /= 10;
+            sampleSize /= 10;
+        }
+        LOG.debug("bit set size = " + bitSetSize + " sample size = "
+                + sampleSize);
+    }
+
+    /**
+     * Fetches next sample, according to the sampling ratio.
+     */
+    @Override
+    public Writable getNext() throws Exception {
+        Writable output = bridge.getNext();
+
+        // sample - if bit is false, advance to the next object
+        while (!sampleBitSet.get(curIndex)) {
+
+            if (output == null) {
+                break;
+            }
+            incIndex();
+            output = bridge.getNext();
+        }
+
+        incIndex();
+        return output;
+    }
+
+    private void incIndex() {
+        curIndex = (++curIndex) % bitSetSize;
+    }
+
+    @Override
+    public boolean beginIteration() throws Exception {
+        return bridge.beginIteration();
+    }
+
+    @Override
+    public boolean setNext(DataInputStream inputStream) throws Exception {
+        return bridge.setNext(inputStream);
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return bridge.isThreadSafe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
new file mode 100644
index 0000000..c3ee731
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/WriteBridge.java
@@ -0,0 +1,117 @@
+package org.apache.hawq.pxf.service;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.*;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.apache.hawq.pxf.api.utilities.Utilities;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInputStream;
+import java.util.List;
+
+/*
+ * WriteBridge class creates appropriate accessor and resolver.
+ * It reads data from inputStream by the resolver,
+ * and writes it to the Hadoop storage with the accessor.
+ */
+public class WriteBridge implements Bridge {
+    private static final Log LOG = LogFactory.getLog(WriteBridge.class);
+    WriteAccessor fileAccessor = null;
+    WriteResolver fieldsResolver = null;
+    BridgeInputBuilder inputBuilder;
+
+    /*
+     * C'tor - set the implementation of the bridge
+     */
+    public WriteBridge(ProtocolData protocolData) throws Exception {
+
+        inputBuilder = new BridgeInputBuilder(protocolData);
+        /* plugins accept InputData parameters */
+        fileAccessor = getFileAccessor(protocolData);
+        fieldsResolver = getFieldsResolver(protocolData);
+
+    }
+
+    /*
+     * Accesses the underlying HDFS file
+     */
+    @Override
+    public boolean beginIteration() throws Exception {
+        return fileAccessor.openForWrite();
+    }
+
+    /*
+     * Read data from stream, convert it using WriteResolver into OneRow object, and
+     * pass to WriteAccessor to write into file.
+     */
+    @Override
+    public boolean setNext(DataInputStream inputStream) throws Exception {
+
+        List<OneField> record = inputBuilder.makeInput(inputStream);
+        if (record == null) {
+            close();
+            return false;
+        }
+
+        OneRow onerow = fieldsResolver.setFields(record);
+        if (onerow == null) {
+            close();
+            return false;
+        }
+        if (!fileAccessor.writeNextObject(onerow)) {
+            close();
+            throw new BadRecordException();
+        }
+        return true;
+    }
+
+    private void close() throws Exception {
+        try {
+            fileAccessor.closeForWrite();
+        } catch (Exception e) {
+            LOG.error("Failed to close bridge resources: " + e.getMessage());
+            throw e;
+        }
+    }
+
+    private static WriteAccessor getFileAccessor(InputData inputData) throws Exception {
+        return (WriteAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData);
+    }
+
+    private static WriteResolver getFieldsResolver(InputData inputData) throws Exception {
+        return (WriteResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData);
+    }
+
+    @Override
+    public Writable getNext() {
+        throw new UnsupportedOperationException("getNext is not implemented");
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java
new file mode 100644
index 0000000..6b911f2
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/BufferWritable.java
@@ -0,0 +1,98 @@
+package org.apache.hawq.pxf.service.io;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.UnsupportedOperationException;
+
+/**
+ * A serializable object for transporting a byte array through the Bridge
+ * framework
+ */
+public class BufferWritable implements Writable {
+
+    byte[] buf = null;
+
+    /**
+     * Constructs a BufferWritable. Copies the buffer reference and not the
+     * actual bytes. This class is used when we intend to transport a buffer
+     * through the Bridge framework without copying the data each time the
+     * buffer is passed between the Bridge objects.
+     *
+     * @param inBuf buffer
+     */
+    public BufferWritable(byte[] inBuf) {
+        buf = inBuf;
+    }
+
+    /**
+     * Serializes the fields of this object to <code>out</code>.
+     *
+     * @param out <code>DataOutput</code> to serialize this object into.
+     * @throws IOException if the buffer was not set
+     */
+    @Override
+    public void write(DataOutput out) throws IOException {
+        if (buf == null)
+            throw new IOException("BufferWritable was not set");
+        out.write(buf);
+    }
+
+    /**
+     * Deserializes the fields of this object from <code>in</code>.
+     * <p>
+     * For efficiency, implementations should attempt to re-use storage in the
+     * existing object where possible.
+     * </p>
+     *
+     * @param in <code>DataInput</code> to deserialize this object from
+     * @throws UnsupportedOperationException this function is not supported
+     */
+    @Override
+    public void readFields(DataInput in) {
+        throw new UnsupportedOperationException(
+                "BufferWritable.readFields() is not implemented");
+    }
+
+    /**
+     * Appends given app's buffer to existing buffer.
+     * <br>
+     * Not efficient - requires copying both this and the appended buffer.
+     *
+     * @param app buffer to append
+     */
+    public void append(byte[] app) {
+        if (buf == null) {
+            buf = app;
+            return;
+        }
+        if (app == null) {
+            return;
+        }
+
+        byte[] newbuf = new byte[buf.length + app.length];
+        System.arraycopy(buf, 0, newbuf, 0, buf.length);
+        System.arraycopy(app, 0, newbuf, buf.length, app.length);
+        buf = newbuf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java
new file mode 100644
index 0000000..5bc26f1
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/GPDBWritable.java
@@ -0,0 +1,893 @@
+package org.apache.hawq.pxf.service.io;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.*;
+import java.util.Arrays;
+
+import static org.apache.hawq.pxf.api.io.DataType.*;
+
+
+/**
+ * This class represents a GPDB record in the form of
+ * a Java object.
+ */
+public class GPDBWritable implements Writable {
+    /*
+     * GPDBWritable is using the following serialization form:
+	 * Total Length | Version | Error Flag | # of columns | Col type |...| Col type | Null Bit array            |   Col val...
+     * 4 byte		| 2 byte  |	1 byte     |   2 byte     |  1 byte  |...|  1 byte  | ceil(# of columns/8) byte |   Fixed or Var length
+     *
+     * For fixed length type, we know the length.
+     * In the col val, we align pad according to the alignment requirement of the type.
+     * For var length type, the alignment is always 4 byte.
+     * For var length type, col val is <4 byte length><payload val>
+	 */
+
+    private static final Log LOG = LogFactory.getLog(GPDBWritable.class);
+    private static final int EOF = -1;
+
+    /*
+     * Enum of the Database type
+     */
+    private enum DBType {
+        BIGINT(8, 8),
+        BOOLEAN(1, 1),
+        FLOAT8(8, 8),
+        INTEGER(4, 4),
+        REAL(4, 4),
+        SMALLINT(2, 2),
+        BYTEA(4, -1),
+        TEXT(4, -1);
+
+        private final int typelength; // -1 means var length
+        private final int alignment;
+
+        DBType(int align, int len) {
+            this.typelength = len;
+            this.alignment = align;
+        }
+
+        public int getTypeLength() {
+            return typelength;
+        }
+
+        public boolean isVarLength() {
+            return typelength == -1;
+        }
+
+        // return the alignment requirement of the type
+        public int getAlignment() {
+            return alignment;
+        }
+    }
+
+    /*
+     * Constants
+     */
+    private static final int PREV_VERSION = 1;
+    private static final int VERSION = 2; /* for backward compatibility */
+    private static final String CHARSET = "UTF-8";
+
+    /*
+     * Local variables
+     */
+    protected int[] colType;
+    protected Object[] colValue;
+    protected int alignmentOfEightBytes = 8;
+    protected byte errorFlag = 0;
+    protected int pktlen = EOF;
+
+    public int[] getColType() {
+        return colType;
+    }
+
+    /**
+     * An exception class for column type definition and
+     * set/get value mismatch.
+     */
+    public class TypeMismatchException extends IOException {
+        public TypeMismatchException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
+     * Empty Constructor
+     */
+    public GPDBWritable() {
+        initializeEightByteAlignment();
+    }
+
+    /**
+     * Constructor to build a db record. colType defines the schema
+     *
+     * @param columnType the table column types
+     */
+    public GPDBWritable(int[] columnType) {
+        initializeEightByteAlignment();
+        colType = columnType;
+        colValue = new Object[columnType.length];
+    }
+
+    /**
+     * Constructor to build a db record from a serialized form.
+     *
+     * @param data a record in the serialized form
+     * @throws IOException if the data is malformatted.
+     */
+    public GPDBWritable(byte[] data) throws IOException {
+        initializeEightByteAlignment();
+        ByteArrayInputStream bis = new ByteArrayInputStream(data);
+        DataInputStream dis = new DataInputStream(bis);
+
+        readFields(dis);
+    }
+
+    /*
+     * Read first 4 bytes, and verify it's a valid packet length.
+     * Upon error returns EOF.
+     */
+    private int readPktLen(DataInput in) throws IOException {
+        pktlen = EOF;
+
+        try {
+            pktlen = in.readInt();
+        } catch (EOFException e) {
+            LOG.debug("Reached end of stream (EOFException)");
+            return EOF;
+        }
+        if (pktlen == EOF) {
+            LOG.debug("Reached end of stream (returned -1)");
+        }
+
+        return pktlen;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        /*
+         * extract pkt len.
+		 *
+		 * GPSQL-1107:
+		 * The DataInput might already be empty (EOF), but we can't check it beforehand.
+		 * If that's the case, pktlen is updated to -1, to mark that the object is still empty.
+		 * (can be checked with isEmpty()).
+		 */
+        pktlen = readPktLen(in);
+        if (isEmpty()) {
+            return;
+        }
+
+		/* extract the version and col cnt */
+        int version = in.readShort();
+        int curOffset = 4 + 2;
+        int colCnt;
+
+		/* !!! Check VERSION !!! */
+        if (version != GPDBWritable.VERSION && version != GPDBWritable.PREV_VERSION) {
+            throw new IOException("Current GPDBWritable version(" +
+                    GPDBWritable.VERSION + ") does not match input version(" +
+                    version + ")");
+        }
+
+        if (version == GPDBWritable.VERSION) {
+            errorFlag = in.readByte();
+            curOffset += 1;
+        }
+
+        colCnt = in.readShort();
+        curOffset += 2;
+
+		/* Extract Column Type */
+        colType = new int[colCnt];
+        DBType[] coldbtype = new DBType[colCnt];
+        for (int i = 0; i < colCnt; i++) {
+            int enumType = (in.readByte());
+            curOffset += 1;
+            if (enumType == DBType.BIGINT.ordinal()) {
+                colType[i] = BIGINT.getOID();
+                coldbtype[i] = DBType.BIGINT;
+            } else if (enumType == DBType.BOOLEAN.ordinal()) {
+                colType[i] = BOOLEAN.getOID();
+                coldbtype[i] = DBType.BOOLEAN;
+            } else if (enumType == DBType.FLOAT8.ordinal()) {
+                colType[i] = FLOAT8.getOID();
+                coldbtype[i] = DBType.FLOAT8;
+            } else if (enumType == DBType.INTEGER.ordinal()) {
+                colType[i] = INTEGER.getOID();
+                coldbtype[i] = DBType.INTEGER;
+            } else if (enumType == DBType.REAL.ordinal()) {
+                colType[i] = REAL.getOID();
+                coldbtype[i] = DBType.REAL;
+            } else if (enumType == DBType.SMALLINT.ordinal()) {
+                colType[i] = SMALLINT.getOID();
+                coldbtype[i] = DBType.SMALLINT;
+            } else if (enumType == DBType.BYTEA.ordinal()) {
+                colType[i] = BYTEA.getOID();
+                coldbtype[i] = DBType.BYTEA;
+            } else if (enumType == DBType.TEXT.ordinal()) {
+                colType[i] = TEXT.getOID();
+                coldbtype[i] = DBType.TEXT;
+            } else {
+                throw new IOException("Unknown GPDBWritable.DBType ordinal value");
+            }
+        }
+
+		/* Extract null bit array */
+        byte[] nullbytes = new byte[getNullByteArraySize(colCnt)];
+        in.readFully(nullbytes);
+        curOffset += nullbytes.length;
+        boolean[] colIsNull = byteArrayToBooleanArray(nullbytes, colCnt);
+
+		/* extract column value */
+        colValue = new Object[colCnt];
+        for (int i = 0; i < colCnt; i++) {
+            if (!colIsNull[i]) {
+                /* Skip the alignment padding */
+                int skipbytes = roundUpAlignment(curOffset, coldbtype[i].getAlignment()) - curOffset;
+                for (int j = 0; j < skipbytes; j++) {
+                    in.readByte();
+                }
+                curOffset += skipbytes;
+
+				/* For fixed length type, increment the offset according to type type length here.
+                 * For var length type (BYTEA, TEXT), we'll read 4 byte length header and the
+				 * actual payload.
+				 */
+                int varcollen = -1;
+                if (coldbtype[i].isVarLength()) {
+                    varcollen = in.readInt();
+                    curOffset += 4 + varcollen;
+                } else {
+                    curOffset += coldbtype[i].getTypeLength();
+                }
+
+                switch (DataType.get(colType[i])) {
+                    case BIGINT: {
+                        colValue[i] = in.readLong();
+                        break;
+                    }
+                    case BOOLEAN: {
+                        colValue[i] = in.readBoolean();
+                        break;
+                    }
+                    case FLOAT8: {
+                        colValue[i] = in.readDouble();
+                        break;
+                    }
+                    case INTEGER: {
+                        colValue[i] = in.readInt();
+                        break;
+                    }
+                    case REAL: {
+                        colValue[i] = in.readFloat();
+                        break;
+                    }
+                    case SMALLINT: {
+                        colValue[i] = in.readShort();
+                        break;
+                    }
+
+					/* For BYTEA column, it has a 4 byte var length header. */
+                    case BYTEA: {
+                        colValue[i] = new byte[varcollen];
+                        in.readFully((byte[]) colValue[i]);
+                        break;
+                    }
+                    /* For text formatted column, it has a 4 byte var length header
+                     * and it's always null terminated string.
+					 * So, we can remove the last "\0" when constructing the string.
+					 */
+                    case TEXT: {
+                        byte[] data = new byte[varcollen];
+                        in.readFully(data, 0, varcollen);
+                        colValue[i] = new String(data, 0, varcollen - 1, CHARSET);
+                        break;
+                    }
+
+                    default:
+                        throw new IOException("Unknown GPDBWritable ColType");
+                }
+            }
+        }
+
+		/* Skip the ending alignment padding */
+        int skipbytes = roundUpAlignment(curOffset, 8) - curOffset;
+        for (int j = 0; j < skipbytes; j++) {
+            in.readByte();
+        }
+        curOffset += skipbytes;
+
+        if (errorFlag != 0) {
+            throw new IOException("Received error value " + errorFlag + " from format");
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        int numCol = colType.length;
+        boolean[] nullBits = new boolean[numCol];
+        int[] colLength = new int[numCol];
+        byte[] enumType = new byte[numCol];
+        int[] padLength = new int[numCol];
+        byte[] padbytes = new byte[8];
+
+        /**
+         * Compute the total payload and header length
+         * header = total length (4 byte), Version (2 byte), Error (1 byte), #col (2 byte)
+         * col type array = #col * 1 byte
+         * null bit array = ceil(#col/8)
+         */
+        int datlen = 4 + 2 + 1 + 2;
+        datlen += numCol;
+        datlen += getNullByteArraySize(numCol);
+
+        for (int i = 0; i < numCol; i++) {
+            /* Get the enum type */
+            DBType coldbtype;
+            switch (DataType.get(colType[i])) {
+                case BIGINT:
+                    coldbtype = DBType.BIGINT;
+                    break;
+                case BOOLEAN:
+                    coldbtype = DBType.BOOLEAN;
+                    break;
+                case FLOAT8:
+                    coldbtype = DBType.FLOAT8;
+                    break;
+                case INTEGER:
+                    coldbtype = DBType.INTEGER;
+                    break;
+                case REAL:
+                    coldbtype = DBType.REAL;
+                    break;
+                case SMALLINT:
+                    coldbtype = DBType.SMALLINT;
+                    break;
+                case BYTEA:
+                    coldbtype = DBType.BYTEA;
+                    break;
+                default:
+                    coldbtype = DBType.TEXT;
+            }
+            enumType[i] = (byte) (coldbtype.ordinal());
+
+			/* Get the actual value, and set the null bit */
+            if (colValue[i] == null) {
+                nullBits[i] = true;
+                colLength[i] = 0;
+            } else {
+                nullBits[i] = false;
+
+				/*
+                 * For fixed length type, we get the fixed length.
+				 * For var len binary format, the length is in the col value.
+				 * For text format, we must convert encoding first.
+				 */
+                if (!coldbtype.isVarLength()) {
+                    colLength[i] = coldbtype.getTypeLength();
+                } else if (!isTextForm(colType[i])) {
+                    colLength[i] = ((byte[]) colValue[i]).length;
+                } else {
+                    colLength[i] = ((String) colValue[i]).getBytes(CHARSET).length;
+                }
+
+				/* calculate and add the type alignment padding */
+                padLength[i] = roundUpAlignment(datlen, coldbtype.getAlignment()) - datlen;
+                datlen += padLength[i];
+
+				/* for variable length type, we add a 4 byte length header */
+                if (coldbtype.isVarLength()) {
+                    datlen += 4;
+                }
+            }
+            datlen += colLength[i];
+        }
+
+		/*
+		 * Add the final alignment padding for the next record
+		 */
+        int endpadding = roundUpAlignment(datlen, 8) - datlen;
+        datlen += endpadding;
+
+		/* Construct the packet header */
+        out.writeInt(datlen);
+        out.writeShort(VERSION);
+        out.writeByte(errorFlag);
+        out.writeShort(numCol);
+
+		/* Write col type */
+        for (int i = 0; i < numCol; i++) {
+            out.writeByte(enumType[i]);
+        }
+
+		/* Nullness */
+        byte[] nullBytes = boolArrayToByteArray(nullBits);
+        out.write(nullBytes);
+
+		/* Column Value */
+        for (int i = 0; i < numCol; i++) {
+            if (!nullBits[i]) {
+				/* Pad the alignment byte first */
+                if (padLength[i] > 0) {
+                    out.write(padbytes, 0, padLength[i]);
+                }
+
+				/* Now, write the actual column value */
+                switch (DataType.get(colType[i])) {
+                    case BIGINT:
+                        out.writeLong(((Long) colValue[i]));
+                        break;
+                    case BOOLEAN:
+                        out.writeBoolean(((Boolean) colValue[i]));
+                        break;
+                    case FLOAT8:
+                        out.writeDouble(((Double) colValue[i]));
+                        break;
+                    case INTEGER:
+                        out.writeInt(((Integer) colValue[i]));
+                        break;
+                    case REAL:
+                        out.writeFloat(((Float) colValue[i]));
+                        break;
+                    case SMALLINT:
+                        out.writeShort(((Short) colValue[i]));
+                        break;
+
+					/* For BYTEA format, add 4byte length header at the beginning  */
+                    case BYTEA:
+                        out.writeInt(colLength[i]);
+                        out.write((byte[]) colValue[i]);
+                        break;
+
+					/* For text format, add 4byte length header. string is already '\0' terminated */
+                    default: {
+                        out.writeInt(colLength[i]);
+                        byte[] data = ((String) colValue[i]).getBytes(CHARSET);
+                        out.write(data);
+                        break;
+                    }
+                }
+            }
+        }
+
+		/* End padding */
+        out.write(padbytes, 0, endpadding);
+    }
+
+    /**
+     * Private helper to convert boolean array to byte array
+     */
+    private static byte[] boolArrayToByteArray(boolean[] data) {
+        int len = data.length;
+        byte[] byts = new byte[getNullByteArraySize(len)];
+
+        for (int i = 0, j = 0, k = 7; i < data.length; i++) {
+            byts[j] |= (data[i] ? 1 : 0) << k--;
+            if (k < 0) {
+                j++;
+                k = 7;
+            }
+        }
+        return byts;
+    }
+
+    /**
+     * Private helper to determine the size of the null byte array
+     */
+    private static int getNullByteArraySize(int colCnt) {
+        return (colCnt / 8) + (colCnt % 8 != 0 ? 1 : 0);
+    }
+
+    /**
+     * Private helper to convert byte array to boolean array
+     */
+    private static boolean[] byteArrayToBooleanArray(byte[] data, int colCnt) {
+        boolean[] bools = new boolean[colCnt];
+        for (int i = 0, j = 0, k = 7; i < bools.length; i++) {
+            bools[i] = ((data[j] >> k--) & 0x01) == 1;
+            if (k < 0) {
+                j++;
+                k = 7;
+            }
+        }
+        return bools;
+    }
+
+    /**
+     * Private helper to round up alignment for the given length
+     */
+    private int roundUpAlignment(int len, int align) {
+        int commonAlignment = align;
+        if (commonAlignment == 8) {
+            commonAlignment = alignmentOfEightBytes;
+        }
+        return (((len) + ((commonAlignment) - 1)) & ~((commonAlignment) - 1));
+    }
+
+    /**
+     * Getter/Setter methods to get/set the column value
+     */
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setLong(int colIdx, Long val)
+            throws TypeMismatchException {
+        checkType(BIGINT, colIdx, true);
+        colValue[colIdx] = val;
+    }
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setBoolean(int colIdx, Boolean val)
+            throws TypeMismatchException {
+        checkType(BOOLEAN, colIdx, true);
+        colValue[colIdx] = val;
+    }
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setBytes(int colIdx, byte[] val)
+            throws TypeMismatchException {
+        checkType(BYTEA, colIdx, true);
+        colValue[colIdx] = val;
+    }
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setString(int colIdx, String val)
+            throws TypeMismatchException {
+        checkType(TEXT, colIdx, true);
+        if (val != null) {
+            colValue[colIdx] = val + "\0";
+        } else {
+            colValue[colIdx] = val;
+        }
+    }
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setFloat(int colIdx, Float val)
+            throws TypeMismatchException {
+        checkType(REAL, colIdx, true);
+        colValue[colIdx] = val;
+    }
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setDouble(int colIdx, Double val)
+            throws TypeMismatchException {
+        checkType(FLOAT8, colIdx, true);
+        colValue[colIdx] = val;
+    }
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setInt(int colIdx, Integer val)
+            throws TypeMismatchException {
+        checkType(INTEGER, colIdx, true);
+        colValue[colIdx] = val;
+    }
+
+    /**
+     * Sets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @param val    the value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public void setShort(int colIdx, Short val)
+            throws TypeMismatchException {
+        checkType(SMALLINT, colIdx, true);
+        colValue[colIdx] = val;
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public Long getLong(int colIdx)
+            throws TypeMismatchException {
+        checkType(BIGINT, colIdx, false);
+        return (Long) colValue[colIdx];
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public Boolean getBoolean(int colIdx)
+            throws TypeMismatchException {
+        checkType(BOOLEAN, colIdx, false);
+        return (Boolean) colValue[colIdx];
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public byte[] getBytes(int colIdx)
+            throws TypeMismatchException {
+        checkType(BYTEA, colIdx, false);
+        return (byte[]) colValue[colIdx];
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public String getString(int colIdx)
+            throws TypeMismatchException {
+        checkType(TEXT, colIdx, false);
+        return (String) colValue[colIdx];
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public Float getFloat(int colIdx)
+            throws TypeMismatchException {
+        checkType(REAL, colIdx, false);
+        return (Float) colValue[colIdx];
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public Double getDouble(int colIdx)
+            throws TypeMismatchException {
+        checkType(FLOAT8, colIdx, false);
+        return (Double) colValue[colIdx];
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public Integer getInt(int colIdx)
+            throws TypeMismatchException {
+        checkType(INTEGER, colIdx, false);
+        return (Integer) colValue[colIdx];
+    }
+
+    /**
+     * Gets the column value of the record.
+     *
+     * @param colIdx the column index
+     * @return column value
+     * @throws TypeMismatchException the column type does not match
+     */
+    public Short getShort(int colIdx)
+            throws TypeMismatchException {
+        checkType(SMALLINT, colIdx, false);
+        return (Short) colValue[colIdx];
+    }
+
+    /**
+     * Sets the error field.
+     *
+     * @param errorVal the error value
+     */
+    public void setError(boolean errorVal) {
+        errorFlag = errorVal ? (byte) 1 : (byte) 0;
+    }
+
+    /**
+     * Returns a string representation of the object.
+     */
+    @Override
+    public String toString() {
+        if (colType == null) {
+            return null;
+        }
+        StringBuilder result = new StringBuilder();
+        for (int i = 0; i < colType.length; i++) {
+            result.append("Column ").append(i).append(":");
+            if (colValue[i] != null) {
+                result.append(colType[i] == BYTEA.getOID()
+                        ? byteArrayInString((byte[]) colValue[i])
+                        : colValue[i]);
+            }
+            result.append("\n");
+        }
+        return result.toString();
+    }
+
+    /**
+     * Helper printing function
+     */
+    private static String byteArrayInString(byte[] data) {
+        StringBuilder result = new StringBuilder();
+        for (Byte b : data) {
+            result.append(b.intValue()).append(" ");
+        }
+        return result.toString();
+    }
+
+    /**
+     * Private Helper to check the type mismatch
+     * If the expected type is stored as string, then it must be set
+     * via setString.
+     * Otherwise, the type must match.
+     */
+    private void checkType(DataType inTyp, int idx, boolean isSet)
+            throws TypeMismatchException {
+        if (idx < 0 || idx >= colType.length) {
+            throw new TypeMismatchException("Column index is out of range");
+        }
+
+        int exTyp = colType[idx];
+
+        if (isTextForm(exTyp)) {
+            if (inTyp != TEXT) {
+                throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), TEXT.getOID(), isSet));
+            }
+        } else if (inTyp != DataType.get(exTyp)) {
+            throw new TypeMismatchException(formErrorMsg(inTyp.getOID(), exTyp, isSet));
+        }
+    }
+
+    private String formErrorMsg(int inTyp, int colTyp, boolean isSet) {
+        return isSet
+                ? "Cannot set " + getTypeName(inTyp) + " to a " + getTypeName(colTyp) + " column"
+                : "Cannot get " + getTypeName(inTyp) + " from a " + getTypeName(colTyp) + " column";
+    }
+
+    /**
+     * Private Helper routine to tell whether a type is Text form or not
+     *
+     * @param type the type OID that we want to check
+     */
+    private boolean isTextForm(int type) {
+        return !Arrays.asList(BIGINT, BOOLEAN, BYTEA, FLOAT8, INTEGER, REAL, SMALLINT).contains(DataType.get(type));
+    }
+
+    /**
+     * Helper to get the type name.
+     * If a given oid is not in the commonly used list, we
+     * would expect a TEXT for it (for the error message).
+     *
+     * @param oid type OID
+     * @return type name
+     */
+    public static String getTypeName(int oid) {
+        switch (DataType.get(oid)) {
+            case BOOLEAN:
+                return "BOOLEAN";
+            case BYTEA:
+                return "BYTEA";
+            case CHAR:
+                return "CHAR";
+            case BIGINT:
+                return "BIGINT";
+            case SMALLINT:
+                return "SMALLINT";
+            case INTEGER:
+                return "INTEGER";
+            case TEXT:
+                return "TEXT";
+            case REAL:
+                return "REAL";
+            case FLOAT8:
+                return "FLOAT8";
+            case BPCHAR:
+                return "BPCHAR";
+            case VARCHAR:
+                return "VARCHAR";
+            case DATE:
+                return "DATE";
+            case TIME:
+                return "TIME";
+            case TIMESTAMP:
+                return "TIMESTAMP";
+            case NUMERIC:
+                return "NUMERIC";
+            default:
+                return "TEXT";
+        }
+    }
+
+    /*
+     * Get alignment from command line to match to the alignment
+     * the C code uses (see gphdfs/src/protocol_formatter/common.c).
+     */
+    private void initializeEightByteAlignment() {
+        String alignment = System.getProperty("greenplum.alignment");
+        if (alignment == null) {
+            return;
+        }
+        alignmentOfEightBytes = Integer.parseInt(alignment);
+    }
+
+    /**
+     * Returns if the writable object is empty,
+     * based on the pkt len as read from stream.
+     * -1 means nothing was read (eof).
+     *
+     * @return whether the writable object is empty
+     */
+    public boolean isEmpty() {
+        return pktlen == EOF;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java
new file mode 100644
index 0000000..253b525
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Text.java
@@ -0,0 +1,399 @@
+package org.apache.hawq.pxf.service.io;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.*;
+import java.util.Arrays;
+
+/**
+ * This class stores text using standard UTF8 encoding. It provides methods to
+ * serialize, deserialize. The type of length is integer and is serialized using
+ * zero-compressed format.
+ */
+public class Text implements Writable {
+
+    // for write
+    private byte[] buf;
+    private static final Log LOG = LogFactory.getLog(Text.class);
+    int curLoc;
+    private static final char LINE_DELIMITER = '\n';
+    private static final int BUF_SIZE = 1024;
+    private static final int EOF = -1;
+
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new ThreadLocal<CharsetEncoder>() {
+        @Override
+        protected CharsetEncoder initialValue() {
+            return Charset.forName("UTF-8").newEncoder().onMalformedInput(
+                    CodingErrorAction.REPORT).onUnmappableCharacter(
+                    CodingErrorAction.REPORT);
+        }
+    };
+    private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new ThreadLocal<CharsetDecoder>() {
+        @Override
+        protected CharsetDecoder initialValue() {
+            return Charset.forName("UTF-8").newDecoder().onMalformedInput(
+                    CodingErrorAction.REPORT).onUnmappableCharacter(
+                    CodingErrorAction.REPORT);
+        }
+    };
+    private byte[] bytes;
+    private int length;
+
+    public Text() {
+        bytes = EMPTY_BYTES;
+        buf = new byte[BUF_SIZE];
+    }
+
+    /**
+     * Construct from a string.
+     *
+     * @param string input string
+     */
+    public Text(String string) {
+        set(string);
+    }
+
+    /**
+     * Construct from another text.
+     *
+     * @param utf8 text to copy
+     */
+    public Text(Text utf8) {
+        set(utf8);
+    }
+
+    /**
+     * Construct from a byte array.
+     *
+     * @param utf8 input byte array
+     */
+    public Text(byte[] utf8) {
+        set(utf8);
+    }
+
+    public static boolean isNegativeVInt(byte value) {
+        return value < -120 || (value >= -112 && value < 0);
+    }
+
+    public static long readVLong(DataInput stream) throws IOException {
+        byte firstByte = stream.readByte();
+        int len = decodeVIntSize(firstByte);
+        if (len == 1) {
+            return firstByte;
+        }
+        long i = 0;
+        for (int idx = 0; idx < len - 1; idx++) {
+            byte b = stream.readByte();
+            i = i << 8;
+            i = i | (b & 0xFF);
+        }
+        return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+    }
+
+    public static int decodeVIntSize(byte value) {
+        if (value >= -112) {
+            return 1;
+        } else if (value < -120) {
+            return -119 - value;
+        }
+        return -111 - value;
+    }
+
+    public static String decode(byte[] utf8, int start, int length)
+            throws CharacterCodingException {
+        return decode(ByteBuffer.wrap(utf8, start, length), true);
+    }
+
+    /**
+     * Converts the provided byte array to a String using the UTF-8 encoding. If
+     * <code>replace</code> is true, then malformed input is replaced with the
+     * substitution character, which is U+FFFD. Otherwise the method throws a
+     * MalformedInputException.
+     *
+     * @param utf8 UTF-8 encoded byte array
+     * @param start start point
+     * @param length length of array
+     * @param replace whether to replace malformed input with substitution
+     *            character
+     * @return decoded string
+     * @throws MalformedInputException if a malformed input is used
+     * @throws CharacterCodingException if the conversion failed
+     */
+    public static String decode(byte[] utf8, int start, int length,
+                                boolean replace)
+            throws CharacterCodingException {
+        return decode(ByteBuffer.wrap(utf8, start, length), replace);
+    }
+
+    private static String decode(ByteBuffer utf8, boolean replace)
+            throws CharacterCodingException {
+        CharsetDecoder decoder = DECODER_FACTORY.get();
+        if (replace) {
+            decoder.onMalformedInput(java.nio.charset.CodingErrorAction.REPLACE);
+            decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+        }
+        String str = decoder.decode(utf8).toString();
+        // set decoder back to its default value: REPORT
+        if (replace) {
+            decoder.onMalformedInput(CodingErrorAction.REPORT);
+            decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+        }
+        return str;
+    }
+
+    /**
+     * Converts the provided String to bytes using the UTF-8 encoding. If the
+     * input is malformed, invalid chars are replaced by a default value.
+     *
+     * @param string string to encode
+     * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
+     *         ByteBuffer.limit()
+     * @throws CharacterCodingException if conversion failed
+     */
+    public static ByteBuffer encode(String string)
+            throws CharacterCodingException {
+        return encode(string, true);
+    }
+
+    /**
+     * Converts the provided String to bytes using the UTF-8 encoding. If
+     * <code>replace</code> is true, then malformed input is replaced with the
+     * substitution character, which is U+FFFD. Otherwise the method throws a
+     * MalformedInputException.
+     *
+     * @param string string to encode
+     * @param replace whether to replace malformed input with substitution
+     *            character
+     * @return ByteBuffer: bytes stores at ByteBuffer.array() and length is
+     *         ByteBuffer.limit()
+     * @throws MalformedInputException if a malformed input is used
+     * @throws CharacterCodingException if the conversion failed
+     */
+    public static ByteBuffer encode(String string, boolean replace)
+            throws CharacterCodingException {
+        CharsetEncoder encoder = ENCODER_FACTORY.get();
+        if (replace) {
+            encoder.onMalformedInput(CodingErrorAction.REPLACE);
+            encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
+        }
+        ByteBuffer bytes = encoder.encode(CharBuffer.wrap(string.toCharArray()));
+        if (replace) {
+            encoder.onMalformedInput(CodingErrorAction.REPORT);
+            encoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+        }
+        return bytes;
+    }
+
+    /**
+     * Returns the raw bytes; however, only data up to {@link #getLength()} is
+     * valid.
+     *
+     * @return raw bytes of byte array
+     */
+    public byte[] getBytes() {
+        return bytes;
+    }
+
+    /**
+     * Returns the number of bytes in the byte array
+     *
+     * @return number of bytes in byte array
+     */
+    public int getLength() {
+        return length;
+    }
+
+    /**
+     * Sets to contain the contents of a string.
+     *
+     * @param string input string
+     */
+    public void set(String string) {
+        try {
+            ByteBuffer bb = encode(string, true);
+            bytes = bb.array();
+            length = bb.limit();
+        } catch (CharacterCodingException e) {
+            throw new RuntimeException("Should not have happened "
+                    + e.toString());
+        }
+    }
+
+    /**
+     * Sets to a UTF-8 byte array.
+     *
+     * @param utf8 input UTF-8 byte array
+     */
+    public void set(byte[] utf8) {
+        set(utf8, 0, utf8.length);
+    }
+
+    /**
+     * Copies a text.
+     *
+     * @param other text object to copy.
+     */
+    public void set(Text other) {
+        set(other.getBytes(), 0, other.getLength());
+    }
+
+    /**
+     * Sets the Text to range of bytes.
+     *
+     * @param utf8 the data to copy from
+     * @param start the first position of the new string
+     * @param len the number of bytes of the new string
+     */
+    public void set(byte[] utf8, int start, int len) {
+        setCapacity(len, false);
+        System.arraycopy(utf8, start, bytes, 0, len);
+        this.length = len;
+    }
+
+    /**
+     * Appends a range of bytes to the end of the given text.
+     *
+     * @param utf8 the data to copy from
+     * @param start the first position to append from utf8
+     * @param len the number of bytes to append
+     */
+    public void append(byte[] utf8, int start, int len) {
+        setCapacity(length + len, true);
+        System.arraycopy(utf8, start, bytes, length, len);
+        length += len;
+    }
+
+    /**
+     * Clears the string to empty.
+     */
+    public void clear() {
+        length = 0;
+    }
+
+    /*
+     * Sets the capacity of this Text object to <em>at least</em>
+     * <code>len</code> bytes. If the current buffer is longer, then the
+     * capacity and existing content of the buffer are unchanged. If
+     * <code>len</code> is larger than the current capacity, the Text object's
+     * capacity is increased to match.
+     *
+     * @param len the number of bytes we need
+     *
+     * @param keepData should the old data be kept
+     */
+    private void setCapacity(int len, boolean keepData) {
+        if (bytes == null || bytes.length < len) {
+            byte[] newBytes = new byte[len];
+            if (bytes != null && keepData) {
+                System.arraycopy(bytes, 0, newBytes, 0, length);
+            }
+            bytes = newBytes;
+        }
+    }
+
+    /**
+     * Convert text back to string
+     *
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        try {
+            return decode(bytes, 0, length);
+        } catch (CharacterCodingException e) {
+            throw new RuntimeException("Should not have happened "
+                    + e.toString());
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        byte[] bytes = getBytes();
+        out.write(bytes, 0, getLength());
+    }
+
+    /**
+     * deserialize
+     */
+    @Override
+    public void readFields(DataInput inputStream) throws IOException {
+
+        byte c;
+        curLoc = 0;
+        clear();
+        while ((c = (byte) ((DataInputStream) inputStream).read()) != EOF) {
+            buf[curLoc] = c;
+            curLoc++;
+
+            if (c == LINE_DELIMITER) {
+                LOG.trace("read one line, size " + curLoc);
+                break;
+            }
+
+            if (isBufferFull()) {
+                flushBuffer();
+            }
+        }
+
+        if (!isBufferEmpty()) {
+            // the buffer doesn't end with a line break.
+            if (c == EOF) {
+                LOG.warn("Stream ended without line break");
+            }
+            flushBuffer();
+        }
+    }
+
+    private boolean isBufferEmpty() {
+        return (curLoc == 0);
+    }
+
+    private boolean isBufferFull() {
+        return (curLoc == BUF_SIZE);
+    }
+
+    private void flushBuffer() {
+        append(buf, 0, curLoc);
+        curLoc = 0;
+    }
+
+    /**
+     * Returns true iff <code>o</code> is a Text with the same contents.
+     */
+    @Override
+    public boolean equals(Object o) {
+        return (o instanceof Text && Arrays.equals(bytes, ((Text) o).bytes));
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(bytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java
new file mode 100644
index 0000000..038da9c
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/io/Writable.java
@@ -0,0 +1,50 @@
+package org.apache.hawq.pxf.service.io;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A serializable object which implements a simple, efficient, serialization
+ * protocol, based on {@link DataInput} and {@link DataOutput}.
+ */
+public interface Writable {
+
+    /**
+     * Serialize the fields of this object to <code>out</code>.
+     *
+     * @param out <code>DataOutput</code> to serialize this object into.
+     * @throws IOException if I/O error occurs
+     */
+    void write(DataOutput out) throws IOException;
+
+    /**
+     * Deserialize the fields of this object from <code>in</code>.
+     * <p>For efficiency, implementations should attempt to re-use storage in the
+     * existing object where possible.</p>
+     *
+     * @param in <code>DataInput</code> to deserialize this object from.
+     * @throws IOException if I/O error occurs
+     */
+    void readFields(DataInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java
new file mode 100644
index 0000000..3a062c3
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -0,0 +1,189 @@
+package org.apache.hawq.pxf.service.rest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+
+import org.apache.catalina.connector.ClientAbortException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.service.Bridge;
+import org.apache.hawq.pxf.service.ReadBridge;
+import org.apache.hawq.pxf.service.ReadSamplingBridge;
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.hawq.pxf.service.utilities.SecuredHDFS;
+
+/*
+ * This class handles the subpath /<version>/Bridge/ of this
+ * REST component
+ */
+@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Bridge/")
+public class BridgeResource extends RestResource {
+
+    private static final Log LOG = LogFactory.getLog(BridgeResource.class);
+    /**
+     * Lock is needed here in the case of a non-thread-safe plugin. Using
+     * synchronized methods is not enough because the bridge work is called by
+     * jetty ({@link StreamingOutput}), after we are getting out of this class's
+     * context.
+     * <p/>
+     * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on
+     * the isThreadSafe parameter that is determined by the bridge.
+     */
+    private static final ReentrantLock BRIDGE_LOCK = new ReentrantLock();
+
+    public BridgeResource() {
+    }
+
+    /**
+     * Used to be HDFSReader. Creates a bridge instance and iterates over its
+     * records, printing it out to outgoing stream. Outputs GPDBWritable or
+     * Text.
+     *
+     * Parameters come through HTTP header.
+     *
+     * @param servletContext Servlet context contains attributes required by
+     *            SecuredHDFS
+     * @param headers Holds HTTP headers from request
+     * @return response object containing stream that will output records
+     * @throws Exception in case of wrong request parameters, or failure to
+     *             initialize bridge
+     */
+    @GET
+    @Produces(MediaType.APPLICATION_OCTET_STREAM)
+    public Response read(@Context final ServletContext servletContext,
+                         @Context HttpHeaders headers) throws Exception {
+        // Convert headers into a regular map
+        Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders());
+
+        LOG.debug("started with parameters: " + params);
+
+        ProtocolData protData = new ProtocolData(params);
+        SecuredHDFS.verifyToken(protData, servletContext);
+        Bridge bridge;
+        float sampleRatio = protData.getStatsSampleRatio();
+        if (sampleRatio > 0) {
+            bridge = new ReadSamplingBridge(protData);
+        } else {
+            bridge = new ReadBridge(protData);
+        }
+        String dataDir = protData.getDataSource();
+        // THREAD-SAFE parameter has precedence
+        boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe();
+        LOG.debug("Request for " + dataDir + " will be handled "
+                + (isThreadSafe ? "without" : "with") + " synchronization");
+
+        return readResponse(bridge, protData, isThreadSafe);
+    }
+
+    Response readResponse(final Bridge bridge, ProtocolData protData,
+                          final boolean threadSafe) {
+        final int fragment = protData.getDataFragment();
+        final String dataDir = protData.getDataSource();
+
+        // Creating an internal streaming class
+        // which will iterate the records and put them on the
+        // output stream
+        final StreamingOutput streaming = new StreamingOutput() {
+            @Override
+            public void write(final OutputStream out) throws IOException,
+                    WebApplicationException {
+                long recordCount = 0;
+
+                if (!threadSafe) {
+                    lock(dataDir);
+                }
+                try {
+
+                    if (!bridge.beginIteration()) {
+                        return;
+                    }
+
+                    Writable record;
+                    DataOutputStream dos = new DataOutputStream(out);
+                    LOG.debug("Starting streaming fragment " + fragment
+                            + " of resource " + dataDir);
+                    while ((record = bridge.getNext()) != null) {
+                        record.write(dos);
+                        ++recordCount;
+                    }
+                    LOG.debug("Finished streaming fragment " + fragment
+                            + " of resource " + dataDir + ", " + recordCount
+                            + " records.");
+                } catch (ClientAbortException e) {
+                    // Occurs whenever client (HAWQ) decides the end the
+                    // connection
+                    LOG.error("Remote connection closed by HAWQ", e);
+                } catch (Exception e) {
+                    LOG.error("Exception thrown when streaming", e);
+                    throw new IOException(e.getMessage());
+                } finally {
+                    LOG.debug("Stopped streaming fragment " + fragment
+                            + " of resource " + dataDir + ", " + recordCount
+                            + " records.");
+                    if (!threadSafe) {
+                        unlock(dataDir);
+                    }
+                }
+            }
+        };
+
+        return Response.ok(streaming, MediaType.APPLICATION_OCTET_STREAM).build();
+    }
+
+    /**
+     * Locks BRIDGE_LOCK
+     *
+     * @param path path for the request, used for logging.
+     */
+    private void lock(String path) {
+        LOG.trace("Locking BridgeResource for " + path);
+        BRIDGE_LOCK.lock();
+        LOG.trace("Locked BridgeResource for " + path);
+    }
+
+    /**
+     * Unlocks BRIDGE_LOCK
+     *
+     * @param path path for the request, used for logging.
+     */
+    private void unlock(String path) {
+        LOG.trace("Unlocking BridgeResource for " + path);
+        BRIDGE_LOCK.unlock();
+        LOG.trace("Unlocked BridgeResource for " + path);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java
new file mode 100644
index 0000000..1280c09
--- /dev/null
+++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ClusterNodesResource.java
@@ -0,0 +1,148 @@
+package org.apache.hawq.pxf.service.rest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.catalina.connector.ClientAbortException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * Class enhances the API of the HBASE rest server.
+ * Example for querying API getClusterNodesInfo from a web client
+ * <code>curl "http://localhost:51200/pxf/{version}/HadoopCluster/getNodesInfo"</code>
+ * /pxf/ is made part of the path when there is a webapp by that name in tcServer.
+ */
+@Path("/" + Version.PXF_PROTOCOL_VERSION + "/HadoopCluster/")
+public class ClusterNodesResource {
+    private static final Log LOG = LogFactory.getLog(ClusterNodesResource.class);
+
+    public ClusterNodesResource() {
+    }
+
+    /**
+     * Function queries the Hadoop namenode with the getDataNodeStats API It
+     * gets the host's IP and REST port of every HDFS data node in the cluster.
+     * Then, it packs the results in JSON format and writes to the HTTP response
+     * stream. Response Examples:<br>
+     * <ol>
+     * <li>When there are no datanodes - getDataNodeStats returns an empty array
+     * <code>{"regions":[]}</code></li>
+     * <li>When there are datanodes
+     * <code>{"regions":[{"host":"1.2.3.1","port":50075},{"host":"1.2.3.2","port"
+     * :50075}]}</code></li>
+     * </ol>
+     *
+     * @return JSON response with nodes info
+     * @throws Exception if failed to retrieve info
+     */
+    @GET
+    @Path("getNodesInfo")
+    @Produces("application/json")
+    public Response read() throws Exception {
+        LOG.debug("getNodesInfo started");
+        StringBuilder jsonOutput = new StringBuilder("{\"regions\":[");
+        try {
+            /*
+             * 1. Initialize the HADOOP client side API for a distributed file
+             * system
+             */
+            Configuration conf = new Configuration();
+            FileSystem fs = FileSystem.get(conf);
+            DistributedFileSystem dfs = (DistributedFileSystem) fs;
+
+            /*
+             * 2. Query the namenode for the datanodes info. Only live nodes are
+             * returned - in accordance with the results returned by
+             * org.apache.hadoop.hdfs.tools.DFSAdmin#report().
+             */
+            DatanodeInfo[] liveNodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
+
+            /*
+             * 3. Pack the datanodes info in a JSON text format and write it to
+             * the HTTP output stream.
+             */
+            String prefix = "";
+            for (DatanodeInfo node : liveNodes) {
+                verifyNode(node);
+                // write one node to the HTTP stream
+                jsonOutput.append(prefix).append(writeNode(node));
+                prefix = ",";
+            }
+            jsonOutput.append("]}");
+            LOG.debug("getNodesCluster output: " + jsonOutput);
+        } catch (NodeDataException e) {
+            LOG.error("Nodes verification failed", e);
+            throw e;
+        } catch (ClientAbortException e) {
+            LOG.error("Remote connection closed by HAWQ", e);
+            throw e;
+        } catch (java.io.IOException e) {
+            LOG.error("Unhandled exception thrown", e);
+            throw e;
+        }
+
+        return Response.ok(jsonOutput.toString(),
+                MediaType.APPLICATION_JSON_TYPE).build();
+    }
+
+    private class NodeDataException extends java.io.IOException {
+
+        /**
+         *
+         */
+        private static final long serialVersionUID = 1L;
+
+        public NodeDataException(String paramString) {
+            super(paramString);
+        }
+    }
+
+    private void verifyNode(DatanodeInfo node) throws NodeDataException {
+        int port = node.getInfoPort();
+        String ip = node.getIpAddr();
+
+        if (StringUtils.isEmpty(ip)) {
+            throw new NodeDataException("Invalid IP: " + ip + " (Node " + node
+                    + ")");
+        }
+
+        if (port <= 0) {
+            throw new NodeDataException("Invalid port: " + port + " (Node "
+                    + node + ")");
+        }
+    }
+
+    String writeNode(DatanodeInfo node) throws java.io.IOException {
+        return "{\"host\":\"" + node.getIpAddr() + "\",\"port\":"
+                + node.getInfoPort() + "}";
+    }
+}


Mime
View raw message