hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nh...@apache.org
Subject [3/3] incubator-hawq git commit: HAWQ-44. Advanced statistics for PXF tables.
Date Fri, 20 Nov 2015 21:50:41 GMT
HAWQ-44. Advanced statistics for PXF tables.

PXF sample rows are collected into a temporary table, where statistics are derived of them in the same way ANALYZE works for hawq tables.
Statistics are gathered at 3 stages:
1. Getting general statistics - number of fragments, size of data source, size of first fragment
2. Count of first fragment tuples
HAWQ uses these numbers to determine how many tuples are needed, and these parameters are translated to sampling ratio and number of sampled fragments.
3. Sampling the PXF table based on the sampling ratio and number of fragments to be sampled. The returned tuples are saved in a temporary table.

On the PXF side, a function has been added to the Fragmenter API, to allow gathering the stats of the first stage. In addition, a mechanism to sample rows on the fly was added to the Bridge.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/81385f09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/81385f09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/81385f09

Branch: refs/heads/master
Commit: 81385f09fbbc59b2912b2f8ff9a3edbee6427e9b
Parents: 4dbb347
Author: Noa Horn <nhorn@pivotal.io>
Authored: Fri Nov 20 13:28:21 2015 -0800
Committer: Noa Horn <nhorn@pivotal.io>
Committed: Fri Nov 20 13:28:21 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hawq/pxf/api/AnalyzerStats.java  |  40 +-
 .../org/apache/hawq/pxf/api/Fragmenter.java     |  29 +-
 .../org/apache/hawq/pxf/api/FragmentsStats.java | 233 ++++++
 .../apache/hawq/pxf/api/FragmentsStatsTest.java |  90 +++
 .../pxf/plugins/hbase/HBaseDataFragmenter.java  |  15 +-
 .../hawq/pxf/plugins/hdfs/HdfsAnalyzer.java     |  73 +-
 .../pxf/plugins/hdfs/HdfsDataFragmenter.java    |  52 +-
 .../pxf/plugins/hdfs/StringPassResolver.java    |   2 +-
 .../pxf/plugins/hive/HiveDataFragmenter.java    |  10 +-
 .../plugins/hive/HiveInputFormatFragmenter.java |   9 +
 .../hawq/pxf/service/AnalyzerFactory.java       |   4 +-
 .../org/apache/hawq/pxf/service/Bridge.java     |   9 +-
 .../hawq/pxf/service/BridgeOutputBuilder.java   | 117 ++-
 .../hawq/pxf/service/FragmentsResponse.java     |   2 +-
 .../pxf/service/FragmentsResponseFormatter.java |   1 -
 .../org/apache/hawq/pxf/service/ReadBridge.java | 112 ++-
 .../hawq/pxf/service/ReadSamplingBridge.java    | 112 +++
 .../apache/hawq/pxf/service/WriteBridge.java    |   9 +-
 .../hawq/pxf/service/io/BufferWritable.java     |  21 +
 .../hawq/pxf/service/rest/BridgeResource.java   |  65 +-
 .../pxf/service/rest/FragmenterResource.java    | 105 ++-
 .../pxf/service/rest/InvalidPathResource.java   |   6 +-
 .../pxf/service/utilities/AnalyzeUtils.java     | 128 ++++
 .../pxf/service/utilities/ProtocolData.java     |  75 +-
 .../pxf/service/BridgeOutputBuilderTest.java    | 357 +++++++--
 .../pxf/service/ReadSamplingBridgeTest.java     | 225 ++++++
 .../hawq/pxf/service/io/BufferWritableTest.java |  22 +
 .../pxf/service/utilities/AnalyzeUtilsTest.java | 117 +++
 .../pxf/service/utilities/ProtocolDataTest.java | 129 +++-
 src/backend/access/external/Makefile            |   2 +-
 src/backend/access/external/hd_work_mgr.c       |  32 +-
 src/backend/access/external/pxfanalyze.c        | 740 +++++++++++++++++++
 src/backend/access/external/pxfmasterapi.c      | 122 +--
 src/backend/access/external/test/Makefile       |   8 +-
 src/backend/access/external/test/README.txt     |   9 +-
 ...ork_mgr_do_segment_clustering_by_host_test.c |  15 +-
 .../access/external/test/pxfanalyze_test.c      | 171 +++++
 .../access/external/test/pxfmasterapi_test.c    |  21 +-
 src/backend/commands/analyze.c                  | 233 +++---
 src/backend/commands/tablecmds.c                |   4 +-
 src/backend/optimizer/util/plancat.c            |  39 +-
 src/backend/utils/misc/guc.c                    |  11 +
 src/include/access/hd_work_mgr.h                |  14 +-
 src/include/access/pxfanalyze.h                 |  37 +
 src/include/access/pxfmasterapi.h               |   2 +-
 src/include/access/pxfuriparser.h               |   2 +-
 src/include/cdb/cdbanalyze.h                    |   6 -
 src/include/commands/analyzeutils.h             |  35 +
 src/include/utils/guc.h                         |   1 +
 49 files changed, 3126 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
index a06b4f1..3d2c665 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/AnalyzerStats.java
@@ -13,8 +13,8 @@ public class AnalyzerStats {
     private static final long DEFAULT_NUMBER_OF_BLOCKS = 1L;
     private static final long DEFAULT_NUMBER_OF_TUPLES = 1000000L;
 
-    private long blockSize;        // block size (in bytes)
-    private long numberOfBlocks;    // number of blocks
+    private long blockSize; // block size (in bytes)
+    private long numberOfBlocks; // number of blocks
     private long numberOfTuples; // number of tuples
 
     /**
@@ -24,8 +24,7 @@ public class AnalyzerStats {
      * @param numberOfBlocks number of blocks
      * @param numberOfTuples number of tuples
      */
-    public AnalyzerStats(long blockSize,
-                         long numberOfBlocks,
+    public AnalyzerStats(long blockSize, long numberOfBlocks,
                          long numberOfTuples) {
         this.setBlockSize(blockSize);
         this.setNumberOfBlocks(numberOfBlocks);
@@ -34,38 +33,41 @@ public class AnalyzerStats {
 
     /** Constructs an AnalyzerStats with the default values */
     public AnalyzerStats() {
-        this(DEFAULT_BLOCK_SIZE, DEFAULT_NUMBER_OF_BLOCKS, DEFAULT_NUMBER_OF_TUPLES);
+        this(DEFAULT_BLOCK_SIZE, DEFAULT_NUMBER_OF_BLOCKS,
+                DEFAULT_NUMBER_OF_TUPLES);
     }
 
     /**
-     * Given an AnalyzerStats, serialize it in JSON to be used as
-     * the result string for HAWQ. An example result is as follows:
-     * {"PXFDataSourceStats":{"blockSize":67108864,"numberOfBlocks":1,"numberOfTuples":5}}
+     * Given an AnalyzerStats, serialize it in JSON to be used as the result
+     * string for HAWQ. An example result is as follows:
+     * {"PXFDataSourceStats":{"blockSize"
+     * :67108864,"numberOfBlocks":1,"numberOfTuples":5}}
      *
      * @param stats the data to be serialized
      * @return the result in json format
      * @throws IOException if converting to JSON format failed
      */
-    public static String dataToJSON(AnalyzerStats stats) throws IOException  {
+    public static String dataToJSON(AnalyzerStats stats) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
         // mapper serializes all members of the class by default
-        return "{\"PXFDataSourceStats\":" + mapper.writeValueAsString(stats) + "}";
+        return "{\"PXFDataSourceStats\":" + mapper.writeValueAsString(stats)
+                + "}";
     }
 
     /**
-     * Given a stats structure, convert it to be readable. Intended
-     * for debugging purposes only.
+     * Given a stats structure, convert it to be readable. Intended for
+     * debugging purposes only.
      *
      * @param stats the data to be stringify
-     * @param datapath the data path part of the original URI (e.g., table name, *.csv, etc.)
-     * @return the stringify data
+     * @param datapath the data path part of the original URI (e.g., table name,
+     *            *.csv, etc.)
+     * @return the stringified data
      */
     public static String dataToString(AnalyzerStats stats, String datapath) {
-        return "Statistics information for \"" + datapath + "\" " +
-                " Block Size: " + stats.blockSize +
-                ", Number of blocks: " + stats.numberOfBlocks +
-                ", Number of tuples: " + stats.numberOfTuples;
-
+        return "Statistics information for \"" + datapath + "\" "
+                + " Block Size: " + stats.blockSize + ", Number of blocks: "
+                + stats.numberOfBlocks + ", Number of tuples: "
+                + stats.numberOfTuples;
     }
 
     public long getBlockSize() {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
index 4ae057f..cb9cda8 100644
--- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/Fragmenter.java
@@ -7,7 +7,8 @@ import java.util.LinkedList;
 import java.util.List;
 
 /**
- * Abstract class that defines the splitting of a data resource into fragments that can be processed in parallel.
+ * Abstract class that defines the splitting of a data resource into fragments
+ * that can be processed in parallel.
  */
 public abstract class Fragmenter extends Plugin {
     protected List<Fragment> fragments;
@@ -23,11 +24,33 @@ public abstract class Fragmenter extends Plugin {
     }
 
     /**
-     * Gets the fragments of a given path (source name and location of each fragment).
-     * Used to get fragments of data that could be read in parallel from the different segments.
+     * Gets the fragments of a given path (source name and location of each
+     * fragment). Used to get fragments of data that could be read in parallel
+     * from the different segments.
      *
      * @return list of data fragments
      * @throws Exception if fragment list could not be retrieved
      */
     public abstract List<Fragment> getFragments() throws Exception;
+
+    /**
+     * Default implementation of statistics for fragments. The default is:
+     * <ul>
+     * <li>number of fragments - as gathered by {@link #getFragments()}</li>
+     * <li>first fragment size - 64MB</li>
+     * <li>total size - number of fragments times first fragment size</li>
+     * </ul>
+     * Each fragmenter implementation can override this method to better match
+     * its fragments stats.
+     *
+     * @return default statistics
+     * @throws Exception if statistics cannot be gathered
+     */
+    public FragmentsStats getFragmentsStats() throws Exception {
+        List<Fragment> fragments = getFragments();
+        long fragmentsNumber = fragments.size();
+        return new FragmentsStats(fragmentsNumber,
+                FragmentsStats.DEFAULT_FRAGMENT_SIZE, fragmentsNumber
+                        * FragmentsStats.DEFAULT_FRAGMENT_SIZE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java
new file mode 100644
index 0000000..004a11c
--- /dev/null
+++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/FragmentsStats.java
@@ -0,0 +1,233 @@
+package org.apache.hawq.pxf.api;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * FragmentsStats holds statistics for a given path.
+ */
+public class FragmentsStats {
+
+    /**
+     * Default fragment size. Assuming a fragment is equivalent to a block in
+     * HDFS, we guess a full fragment size is 64MB.
+     */
+    public static final long DEFAULT_FRAGMENT_SIZE = 67108864L;
+
+    private static Log Log = LogFactory.getLog(FragmentsStats.class);
+
+    // number of fragments
+    private long fragmentsNumber;
+    // first fragment size
+    private SizeAndUnit firstFragmentSize;
+    // total fragments size
+    private SizeAndUnit totalSize;
+
+    /**
+     * Enum to represent unit (Bytes/KB/MB/GB/TB)
+     */
+    public enum SizeUnit {
+        /**
+         * Byte
+         */
+        B,
+        /**
+         * KB
+         */
+        KB,
+        /**
+         * MB
+         */
+        MB,
+        /**
+         * GB
+         */
+        GB,
+        /**
+         * TB
+         */
+        TB;
+    };
+
+    /**
+     * Container for size and unit
+     */
+    public class SizeAndUnit {
+        long size;
+        SizeUnit unit;
+
+        /**
+         * Default constructor.
+         */
+        public SizeAndUnit() {
+            this.size = 0;
+            this.unit = SizeUnit.B;
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param size size
+         * @param unit unit
+         */
+        public SizeAndUnit(long size, SizeUnit unit) {
+            this.size = size;
+            this.unit = unit;
+        }
+
+        /**
+         * Returns size.
+         *
+         * @return size
+         */
+        public long getSize() {
+            return this.size;
+        }
+
+        /**
+         * Returns unit (Byte/KB/MB/etc.).
+         *
+         * @return unit
+         */
+        public SizeUnit getUnit() {
+            return this.unit;
+        }
+
+        @Override
+        public String toString() {
+            return size + "" + unit;
+        }
+    }
+
+    /**
+     * Constructs an FragmentsStats.
+     *
+     * @param fragmentsNumber number of fragments
+     * @param firstFragmentSize first fragment size (in bytes)
+     * @param totalSize total size (in bytes)
+     */
+    public FragmentsStats(long fragmentsNumber, long firstFragmentSize,
+                          long totalSize) {
+        this.setFragmentsNumber(fragmentsNumber);
+        this.setFirstFragmentSize(firstFragmentSize);
+        this.setTotalSize(totalSize);
+    }
+
+    /**
+     * Given a {@link FragmentsStats}, serialize it in JSON to be used as the
+     * result string for HAWQ. An example result is as follows:
+     * <code>{"PXFFragmentsStats":{"fragmentsNumber"
+     * :3,"firstFragmentSize":67108864,"totalSize":200000000}}</code>
+     *
+     * @param stats the data to be serialized
+     * @return the result in json format
+     * @throws IOException if converting to JSON format failed
+     */
+    public static String dataToJSON(FragmentsStats stats) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        // mapper serializes all members of the class by default
+        return "{\"PXFFragmentsStats\":" + mapper.writeValueAsString(stats)
+                + "}";
+    }
+
+    /**
+     * Given a stats structure, convert it to be readable. Intended for
+     * debugging purposes only.
+     *
+     * @param stats the data to be stringify
+     * @param datapath the data path part of the original URI (e.g., table name,
+     *            *.csv, etc.)
+     * @return the stringified data
+     */
+    public static String dataToString(FragmentsStats stats, String datapath) {
+        return "Statistics information for \"" + datapath + "\" "
+                + " Number of Fragments: " + stats.fragmentsNumber
+                + ", first Fragment size: " + stats.firstFragmentSize
+                + ", total size: " + stats.totalSize;
+    }
+
+    /**
+     * Returns number of fragments for a given data source.
+     *
+     * @return number of fragments
+     */
+    public long getFragmentsNumber() {
+        return fragmentsNumber;
+    }
+
+    private void setFragmentsNumber(long fragmentsNumber) {
+        this.fragmentsNumber = fragmentsNumber;
+    }
+
+    /**
+     * Returns the size in bytes of the first fragment.
+     *
+     * @return first fragment size (in byte)
+     */
+    public SizeAndUnit getFirstFragmentSize() {
+        return firstFragmentSize;
+    }
+
+    private void setFirstFragmentSize(long firstFragmentSize) {
+        this.firstFragmentSize = setSizeAndUnit(firstFragmentSize);
+    }
+
+    /**
+     * Returns the total size of a given source. Usually it means the
+     * aggregation of all its fragments size.
+     *
+     * @return total size
+     */
+    public SizeAndUnit getTotalSize() {
+        return totalSize;
+    }
+
+    private void setTotalSize(long totalSize) {
+        this.totalSize = setSizeAndUnit(totalSize);
+    }
+
+    private SizeAndUnit setSizeAndUnit(long originalSize) {
+        final long THRESHOLD = Integer.MAX_VALUE / 2;
+        int orderOfMagnitude = 0;
+        SizeAndUnit sizeAndUnit = new SizeAndUnit();
+        sizeAndUnit.size = originalSize;
+
+        while (sizeAndUnit.size > THRESHOLD) {
+            sizeAndUnit.size /= 1024;
+            orderOfMagnitude++;
+        }
+
+        sizeAndUnit.unit = getSizeUnit(orderOfMagnitude);
+        return sizeAndUnit;
+    }
+
+    private SizeUnit getSizeUnit(int orderOfMagnitude) {
+        SizeUnit unit;
+        switch (orderOfMagnitude) {
+            case 0:
+                unit = SizeUnit.B;
+                break;
+            case 1:
+                unit = SizeUnit.KB;
+                break;
+            case 2:
+                unit = SizeUnit.MB;
+                break;
+            case 3:
+                unit = SizeUnit.GB;
+                break;
+            case 4:
+                unit = SizeUnit.TB;
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported order of magnitude "
+                                + orderOfMagnitude
+                                + ". Size's order of magnitue can be a value between 0(Bytes) and 4(TB)");
+        }
+        return unit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java
new file mode 100644
index 0000000..4e7eca0
--- /dev/null
+++ b/pxf/pxf-api/src/test/java/org/apache/hawq/pxf/api/FragmentsStatsTest.java
@@ -0,0 +1,90 @@
+package org.apache.hawq.pxf.api;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hawq.pxf.api.FragmentsStats.SizeUnit;
+import org.junit.Test;
+
+public class FragmentsStatsTest {
+
+    @Test
+    public void ctorSizeByte() {
+        ctorSizeTest(10, 100, 100, SizeUnit.B, 1000000, 1000000, SizeUnit.B);
+    }
+
+    @Test
+    public void ctorSizeKB() {
+        ctorSizeTest(40, 50, 50, SizeUnit.B, (long) Math.pow(2, 32), (long) Math.pow(2, 22),
+                SizeUnit.KB);
+    }
+
+    @Test
+    public void ctorSizeMB() {
+        ctorSizeTest(20, 50, 50, SizeUnit.B, (long) Math.pow(2, 40), (long) Math.pow(2, 20),
+                SizeUnit.MB);
+    }
+
+    @Test
+    public void ctorSizeGB() {
+        ctorSizeTest(25, 1000000, 1000000, SizeUnit.B, (long) Math.pow(6, 20),
+                (long) Math.pow(6, 20) / (long) Math.pow(2, 30), SizeUnit.GB);
+    }
+
+    @Test
+    public void ctorSizeTB() {
+        ctorSizeTest(25, 20000000, 20000000, SizeUnit.B, (long) Math.pow(5, 30),
+                (long) Math.pow(5, 30) / (long) Math.pow(2, 40), SizeUnit.TB);
+    }
+
+    @Test
+    public void ctorSize0() {
+        ctorSizeTest(0, 0, 0, SizeUnit.B, 0, 0, SizeUnit.B);
+    }
+
+    @Test
+    public void dataToJSON() throws IOException {
+        FragmentsStats fragmentsStats = new FragmentsStats(25, 20000000, (long) Math.pow(5, 30));
+        String json = FragmentsStats.dataToJSON(fragmentsStats);
+        String expectedJson = "{\"PXFFragmentsStats\":" +
+                "{\"fragmentsNumber\":" + fragmentsStats.getFragmentsNumber() +
+                ",\"firstFragmentSize\":" +
+                "{\"size\":" + fragmentsStats.getFirstFragmentSize().getSize() +
+                ",\"unit\":\"" + fragmentsStats.getFirstFragmentSize().getUnit() + "\"}" +
+                ",\"totalSize\":" +
+                "{\"size\":" + fragmentsStats.getTotalSize().getSize() +
+                ",\"unit\":\"" + fragmentsStats.getTotalSize().getUnit() + "\"}" +
+                "}}";
+        assertEquals(expectedJson, json);
+    }
+
+    @Test
+    public void dataToString() {
+        FragmentsStats fragmentsStats = new FragmentsStats(25, 2000000000, (long) Math.pow(5, 30));
+        String path = "la la la";
+        String str = FragmentsStats.dataToString(fragmentsStats, path);
+        String expected =  "Statistics information for \"" + path + "\" "
+                + " Number of Fragments: " + 25
+                + ", first Fragment size: " + 1953125 + "KB"
+                + ", total size: " + 8388607 + "TB";
+        assertEquals(expected, str);
+    }
+
+    private void ctorSizeTest(long fragsNum, long firstFragSize,
+                              long expectedFirstFragSize,
+                              SizeUnit expectedFirstFragSizeUnit, long totalSize,
+                              long expectedTotalSize,
+                              SizeUnit expectedTotalSizeUnit) {
+        FragmentsStats fragmentsStats = new FragmentsStats(fragsNum,
+                firstFragSize, totalSize);
+        assertEquals(fragsNum, fragmentsStats.getFragmentsNumber());
+        assertEquals(expectedFirstFragSize,
+                fragmentsStats.getFirstFragmentSize().size);
+        assertEquals(expectedFirstFragSizeUnit,
+                fragmentsStats.getFirstFragmentSize().unit);
+        assertEquals(expectedTotalSize, fragmentsStats.getTotalSize().size);
+        assertEquals(expectedTotalSizeUnit,
+                fragmentsStats.getTotalSize().unit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
index 36ae60c..1f1afae 100644
--- a/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
+++ b/pxf/pxf-hbase/src/main/java/org/apache/hawq/pxf/plugins/hbase/HBaseDataFragmenter.java
@@ -2,10 +2,10 @@ package org.apache.hawq.pxf.plugins.hbase;
 
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseLookupTable;
 import org.apache.hawq.pxf.plugins.hbase.utilities.HBaseUtilities;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.*;
@@ -32,11 +32,24 @@ public class HBaseDataFragmenter extends Fragmenter {
     private Admin hbaseAdmin;
     private Connection connection;
 
+    /**
+     * Constructor for HBaseDataFragmenter.
+     *
+     * @param inConf input data such as which HBase table to scan
+     */
     public HBaseDataFragmenter(InputData inConf) {
         super(inConf);
     }
 
     /**
+     * Returns statistics for HBase table. Currently it's not implemented.
+     */
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        throw new UnsupportedOperationException("ANALYZE for HBase plugin is not supported");
+    }
+
+    /**
      * Returns list of fragments containing all of the
      * HBase's table data.
      * Lookup table information with mapping between

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
index c80244a..49a4d39 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAnalyzer.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 /**
  * Analyzer class for HDFS data resources
  *
- * Given an HDFS data source (a file, directory, or wild card pattern)
- * return statistics about it (number of blocks, number of tuples, etc.)
+ * Given an HDFS data source (a file, directory, or wild card pattern) return
+ * statistics about it (number of blocks, number of tuples, etc.)
  */
 public class HdfsAnalyzer extends Analyzer {
     private JobConf jobConf;
@@ -49,28 +49,31 @@ public class HdfsAnalyzer extends Analyzer {
      * Collects a number of basic statistics based on an estimate. Statistics
      * are: number of records, number of hdfs blocks and hdfs block size.
      *
-     * @param datapath path is a data source URI that can appear as a file
-     *        name, a directory name or a wildcard pattern
+     * @param datapath path is a data source URI that can appear as a file name,
+     *            a directory name or a wildcard pattern
      * @return statistics in JSON format
-     * @throws Exception if path is wrong, its metadata cannot be retrieved
-     *                    from file system, or if scanning the first block
-     *                    using the accessor failed
+     * @throws Exception if path is wrong, its metadata cannot be retrieved from
+     *             file system, or if scanning the first block using the
+     *             accessor failed
      */
     @Override
     public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
         long blockSize = 0;
         long numberOfBlocks;
+        long dataSize = 0;
         Path path = new Path(HdfsUtilities.absoluteDataPath(datapath));
 
         ArrayList<InputSplit> splits = getSplits(path);
 
         for (InputSplit split : splits) {
             FileSplit fsp = (FileSplit) split;
-            Path filePath = fsp.getPath();
-            FileStatus fileStatus = fs.getFileStatus(filePath);
-            if (fileStatus.isFile()) {
-                blockSize = fileStatus.getBlockSize();
-                break;
+            dataSize += fsp.getLength();
+            if (blockSize == 0) {
+                Path filePath = fsp.getPath();
+                FileStatus fileStatus = fs.getFileStatus(filePath);
+                if (fileStatus.isFile()) {
+                    blockSize = fileStatus.getBlockSize();
+                }
             }
         }
 
@@ -80,23 +83,38 @@ public class HdfsAnalyzer extends Analyzer {
         }
         numberOfBlocks = splits.size();
 
-
+        /*
+         * The estimate of the number of tuples in table is based on the
+         * actual number of tuples in the first block, multiplied by its
+         * size compared to the size of the whole data to be read.
+         * The calculation:
+         * Ratio of tuples to size = number of tuples in first block / first block size.
+         * Total of tuples = ratio * number of blocks * total block size.
+         */
         long numberOfTuplesInBlock = getNumberOfTuplesInBlock(splits);
-        AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks, numberOfTuplesInBlock * numberOfBlocks);
+        long numberOfTuples = 0;
+        if (!splits.isEmpty()) {
+            long blockLength = splits.get(0).getLength();
+            numberOfTuples = (long) Math.floor((((double) numberOfTuplesInBlock / blockLength) * (dataSize)));
+        }
+        // AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks,
+        AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks,
+                numberOfTuples);
 
-        //print files size to log when in debug level
+        // print files size to log when in debug level
         Log.debug(AnalyzerStats.dataToString(stats, path.toString()));
 
         return stats;
     }
 
     /**
-     * Calculates the number of tuples in a split (block).
-     * Reads one block from HDFS. Exception during reading will
-     * filter upwards and handled in AnalyzerResource
+     * Calculates the number of tuples in a split (block). Reads one block from
+     * HDFS. Exception during reading will filter upwards and handled in
+     * AnalyzerResource
      */
-    private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits) throws Exception {
-        long tuples = -1; /* default  - if we are not able to read data */
+    private long getNumberOfTuplesInBlock(ArrayList<InputSplit> splits)
+            throws Exception {
+        long tuples = -1; /* default - if we are not able to read data */
         ReadAccessor accessor;
 
         if (splits.isEmpty()) {
@@ -104,8 +122,8 @@ public class HdfsAnalyzer extends Analyzer {
         }
 
         /*
-         * metadata information includes: file split's
-         * start, length and hosts (locations).
+         * metadata information includes: file split's start, length and hosts
+         * (locations).
          */
         FileSplit firstSplit = (FileSplit) splits.get(0);
         byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(firstSplit);
@@ -121,6 +139,7 @@ public class HdfsAnalyzer extends Analyzer {
 
             accessor.closeForRead();
         }
+        Log.debug("number of tuples in first block: " + tuples);
 
         return tuples;
     }
@@ -133,11 +152,11 @@ public class HdfsAnalyzer extends Analyzer {
 
         // remove empty splits
         if (splits != null) {
-	        for (InputSplit split : splits) {
-	        	if (split.getLength() > 0) {
-	        		result.add(split);
-	        	}
-	        }
+            for (InputSplit split : splits) {
+                if (split.getLength() > 0) {
+                    result.add(split);
+                }
+            }
         }
 
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
index 5c81ef8..cccc75a 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsDataFragmenter.java
@@ -2,9 +2,11 @@ package org.apache.hawq.pxf.plugins.hdfs;
 
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.utilities.InputData;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import org.apache.hawq.pxf.plugins.hdfs.utilities.PxfInputFormat;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
@@ -12,6 +14,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -43,19 +46,11 @@ public class HdfsDataFragmenter extends Fragmenter {
     @Override
     public List<Fragment> getFragments() throws Exception {
         String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
-        InputSplit[] splits = getSplits(new Path(absoluteDataPath));
+        List<InputSplit> splits = getSplits(new Path(absoluteDataPath));
 
-        for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
+        for (InputSplit split : splits) {
             FileSplit fsp = (FileSplit) split;
 
-            /*
-             * HD-2547: If the file is empty, an empty split is returned: no
-             * locations and no length.
-             */
-            if (fsp.getLength() <= 0) {
-                continue;
-            }
-
             String filepath = fsp.getPath().toUri().getPath();
             String[] hosts = fsp.getLocations();
 
@@ -71,9 +66,40 @@ public class HdfsDataFragmenter extends Fragmenter {
         return fragments;
     }
 
-    private InputSplit[] getSplits(Path path) throws IOException {
-        PxfInputFormat format = new PxfInputFormat();
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+        ArrayList<InputSplit> splits = getSplits(new Path(absoluteDataPath));
+
+        if (splits.isEmpty()) {
+            return new FragmentsStats(0, 0, 0);
+        }
+        long totalSize = 0;
+        for (InputSplit split: splits) {
+            totalSize += split.getLength();
+        }
+        InputSplit firstSplit = splits.get(0);
+        return new FragmentsStats(splits.size(), firstSplit.getLength(), totalSize);
+    }
+
+    private ArrayList<InputSplit> getSplits(Path path) throws IOException {
+        PxfInputFormat fformat = new PxfInputFormat();
         PxfInputFormat.setInputPaths(jobConf, path);
-        return format.getSplits(jobConf, 1);
+        InputSplit[] splits = fformat.getSplits(jobConf, 1);
+        ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+        /*
+         * HD-2547: If the file is empty, an empty split is returned: no
+         * locations and no length.
+         */
+        if (splits != null) {
+            for (InputSplit split : splits) {
+                if (split.getLength() > 0) {
+                    result.add(split);
+                }
+            }
+        }
+
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
index efce79f..aa8fc84 100644
--- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolver.java
@@ -63,7 +63,7 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
      * Creates a OneRow object from the singleton list.
      */
     @Override
-    public OneRow setFields(List<OneField> record) throws Exception {
+    public OneRow setFields(List<OneField> record) {
         if (((byte[]) record.get(0).val).length == 0) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
index 881aeac..1408a78 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveDataFragmenter.java
@@ -24,10 +24,10 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
-
 import org.apache.hawq.pxf.api.FilterParser;
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.Metadata;
 import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
 import org.apache.hawq.pxf.api.utilities.InputData;
@@ -443,4 +443,12 @@ public class HiveDataFragmenter extends Fragmenter {
 
         return true;
     }
+
+    /**
+     * Returns statistics for Hive table. Currently it's not implemented.
+     */
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        throw new UnsupportedOperationException("ANALYZE for Hive plugin is not supported");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
index 5de36b2..0c09e9b 100644
--- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveInputFormatFragmenter.java
@@ -1,5 +1,6 @@
 package org.apache.hawq.pxf.plugins.hive;
 
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.api.UnsupportedTypeException;
 import org.apache.hawq.pxf.api.UserDataException;
 import org.apache.hawq.pxf.api.io.DataType;
@@ -259,4 +260,12 @@ public class HiveInputFormatFragmenter extends HiveDataFragmenter {
 
         return userData.getBytes();
     }
+
+    /**
+     * Returns statistics for Hive table. Currently it's not implemented.
+     */
+    @Override
+    public FragmentsStats getFragmentsStats() throws Exception {
+        throw new UnsupportedOperationException("ANALYZE for HiveRc and HiveText plugins is not supported");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
index 6784916..befa07a 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AnalyzerFactory.java
@@ -6,12 +6,12 @@ import org.apache.hawq.pxf.service.utilities.Utilities;
 
 /*
  * Factory class for creation of Analyzer objects. The actual Analyzer object is "hidden" behind
- * an Analyzer abstract class which is returned by the AnalyzerFactory. 
+ * an Analyzer abstract class which is returned by the AnalyzerFactory.
  */
 public class AnalyzerFactory {
     static public Analyzer create(InputData inputData) throws Exception {
     	String analyzerName = inputData.getAnalyzer();
-    	
+
         return (Analyzer) Utilities.createAnyInstance(InputData.class, analyzerName, inputData);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
index 8743d87..2c9aa27 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java
@@ -4,11 +4,10 @@ import org.apache.hawq.pxf.service.io.Writable;
 
 import java.io.DataInputStream;
 
-/*
- * Bridge interface - defines the interface of the Bridge classes.
- * Any Bridge class acts as an iterator over Hadoop stored data, and 
- * should implement getNext (for reading) or setNext (for writing) 
- * for handling accessed data.
+/**
+ * Bridge interface - defines the interface of the Bridge classes. Any Bridge
+ * class acts as an iterator over Hadoop stored data, and should implement
+ * getNext (for reading) or setNext (for writing) for handling accessed data.
  */
 public interface Bridge {
     boolean beginIteration() throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
index 99255fa..7e5900c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/BridgeOutputBuilder.java
@@ -10,10 +10,15 @@ import org.apache.hawq.pxf.service.io.GPDBWritable.TypeMismatchException;
 import org.apache.hawq.pxf.service.io.Text;
 import org.apache.hawq.pxf.service.io.Writable;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.lang.reflect.Array;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 
 import static org.apache.hawq.pxf.api.io.DataType.TEXT;
@@ -29,9 +34,17 @@ import static org.apache.hawq.pxf.api.io.DataType.TEXT;
 public class BridgeOutputBuilder {
     private ProtocolData inputData;
     private Writable output = null;
+    private LinkedList<Writable> outputList = null;
+    private Writable partialLine = null;
     private GPDBWritable errorRecord = null;
     private int[] schema;
     private String[] colNames;
+    private boolean samplingEnabled = false;
+    private boolean isPartialLine = false;
+
+    private static final byte DELIM = 10; /* (byte)'\n'; */
+
+    private static final Log LOG = LogFactory.getLog(BridgeOutputBuilder.class);
 
     /**
      * Constructs a BridgeOutputBuilder.
@@ -41,7 +54,9 @@ public class BridgeOutputBuilder {
      */
     public BridgeOutputBuilder(ProtocolData input) {
         inputData = input;
+        outputList = new LinkedList<Writable>();
         makeErrorRecord();
+        samplingEnabled = (inputData.getStatsSampleRatio() > 0);
     }
 
     /**
@@ -87,18 +102,29 @@ public class BridgeOutputBuilder {
      * Translates recFields (obtained from the Resolver) into an output record.
      *
      * @param recFields record fields to be serialized
-     * @return Writable object with serialized row
+     * @return list of Writable objects with serialized row
      * @throws BadRecordException if building the output record failed
      */
-    public Writable makeOutput(List<OneField> recFields)
+    public LinkedList<Writable> makeOutput(List<OneField> recFields)
             throws BadRecordException {
         if (output == null && inputData.outputFormat() == OutputFormat.BINARY) {
             makeGPDBWritableOutput();
         }
 
+        outputList.clear();
+
         fillOutputRecord(recFields);
 
-        return output;
+        return outputList;
+    }
+
+    /**
+     * Returns whether or not this is a partial line.
+     *
+     * @return true for a partial line
+     */
+    public Writable getPartialLine() {
+        return partialLine;
     }
 
     /**
@@ -167,6 +193,8 @@ public class BridgeOutputBuilder {
 
             fillOneGPDBWritableField(current, i);
         }
+
+        outputList.add(output);
     }
 
     /**
@@ -201,7 +229,8 @@ public class BridgeOutputBuilder {
      * Fills a Text object based on recFields.
      *
      * @param recFields record fields
-     * @throws BadRecordException if text formatted record has more than one field
+     * @throws BadRecordException if text formatted record has more than one
+     *             field
      */
     void fillText(List<OneField> recFields) throws BadRecordException {
         /*
@@ -216,10 +245,66 @@ public class BridgeOutputBuilder {
         int type = fld.type;
         Object val = fld.val;
         if (DataType.get(type) == DataType.BYTEA) {// from LineBreakAccessor
-            output = new BufferWritable((byte[]) val);
+            if (samplingEnabled) {
+                convertTextDataToLines((byte[]) val);
+            } else {
+                output = new BufferWritable((byte[]) val);
+                outputList.add(output); // TODO break output into lines
+            }
         } else { // from QuotedLineBreakAccessor
             String textRec = (String) val;
             output = new Text(textRec + "\n");
+            outputList.add(output);
+        }
+    }
+
+    /**
+     * Breaks raw bytes into lines. Used only for sampling.
+     *
+     * When sampling a data source, we have to make sure that
+     * we deal with actual rows (lines) and not bigger chunks of
+     * data such as used by LineBreakAccessor for performance.
+     * The input byte array is broken into lines, each one stored in
+     * the outputList. In case the read data doesn't end with a line delimiter,
+     * which can happen when reading chunks of bytes, the partial line is
+     * stored separately, and is being completed when reading the next chunk of data.
+     *
+     * @param val input raw data to break into lines
+     */
+    void convertTextDataToLines(byte[] val) {
+        int len = val.length;
+        int start = 0;
+        int end = 0;
+        byte[] line;
+        BufferWritable writable;
+
+        while (start < len) {
+            end = ArrayUtils.indexOf(val, DELIM, start);
+            if (end == ArrayUtils.INDEX_NOT_FOUND) {
+                // data finished in the middle of the line
+                end = len;
+                isPartialLine = true;
+            } else {
+                end++; // include the DELIM character
+                isPartialLine = false;
+            }
+            line = Arrays.copyOfRange(val, start, end);
+
+            if (partialLine != null) {
+                // partial data was completed
+                ((BufferWritable) partialLine).append(line);
+                writable = (BufferWritable) partialLine;
+                partialLine = null;
+            } else {
+                writable = new BufferWritable(line);
+            }
+
+            if (isPartialLine) {
+                partialLine = writable;
+            } else {
+                outputList.add(writable);
+            }
+            start = end;
         }
     }
 
@@ -228,32 +313,33 @@ public class BridgeOutputBuilder {
      *
      * @param oneField field
      * @param colIdx column index
-     * @throws BadRecordException if field type is not supported or doesn't match the schema
+     * @throws BadRecordException if field type is not supported or doesn't
+     *             match the schema
      */
     void fillOneGPDBWritableField(OneField oneField, int colIdx)
             throws BadRecordException {
         int type = oneField.type;
         Object val = oneField.val;
-        GPDBWritable GPDBoutput = (GPDBWritable) output;
+        GPDBWritable gpdbOutput = (GPDBWritable) output;
         try {
             switch (DataType.get(type)) {
                 case INTEGER:
-                    GPDBoutput.setInt(colIdx, (Integer) val);
+                    gpdbOutput.setInt(colIdx, (Integer) val);
                     break;
                 case FLOAT8:
-                    GPDBoutput.setDouble(colIdx, (Double) val);
+                    gpdbOutput.setDouble(colIdx, (Double) val);
                     break;
                 case REAL:
-                    GPDBoutput.setFloat(colIdx, (Float) val);
+                    gpdbOutput.setFloat(colIdx, (Float) val);
                     break;
                 case BIGINT:
-                    GPDBoutput.setLong(colIdx, (Long) val);
+                    gpdbOutput.setLong(colIdx, (Long) val);
                     break;
                 case SMALLINT:
-                    GPDBoutput.setShort(colIdx, (Short) val);
+                    gpdbOutput.setShort(colIdx, (Short) val);
                     break;
                 case BOOLEAN:
-                    GPDBoutput.setBoolean(colIdx, (Boolean) val);
+                    gpdbOutput.setBoolean(colIdx, (Boolean) val);
                     break;
                 case BYTEA:
                     byte[] bts = null;
@@ -264,7 +350,7 @@ public class BridgeOutputBuilder {
                             bts[j] = Array.getByte(val, j);
                         }
                     }
-                    GPDBoutput.setBytes(colIdx, bts);
+                    gpdbOutput.setBytes(colIdx, bts);
                     break;
                 case VARCHAR:
                 case BPCHAR:
@@ -273,7 +359,8 @@ public class BridgeOutputBuilder {
                 case NUMERIC:
                 case TIMESTAMP:
                 case DATE:
-                    GPDBoutput.setString(colIdx, ObjectUtils.toString(val, null));
+                    gpdbOutput.setString(colIdx,
+                            ObjectUtils.toString(val, null));
                     break;
                 default:
                     String valClassName = (val != null) ? val.getClass().getSimpleName()

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
index 47f883c..e724467 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponse.java
@@ -39,7 +39,7 @@ public class FragmentsResponse implements StreamingOutput {
      * Serializes a fragments list in JSON,
      * To be used as the result string for HAWQ.
      * An example result is as follows:
-     * {@code {"PXFFragments":[{"replicas":["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],"sourceName":"text2.csv", "index":"0", "metadata":<base64 metadata for fragment>, "userData":"<data_specific_to_third_party_fragmenter>"},{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"],"sourceName":"text_data.csv","index":"0","metadata":<base64 metadata for fragment>,"userData":"<data_specific_to_third_party_fragmenter>"}]}}
+     * <code>{"PXFFragments":[{"replicas":["sdw1.corp.emc.com","sdw3.corp.emc.com","sdw8.corp.emc.com"],"sourceName":"text2.csv", "index":"0", "metadata":<base64 metadata for fragment>, "userData":"<data_specific_to_third_party_fragmenter>"},{"replicas":["sdw2.corp.emc.com","sdw4.corp.emc.com","sdw5.corp.emc.com"],"sourceName":"text_data.csv","index":"0","metadata":<base64 metadata for fragment>,"userData":"<data_specific_to_third_party_fragmenter>"}]}</code>
      */
     @Override
     public void write(OutputStream output) throws IOException,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
index 0e9c47f..107a60c 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/FragmentsResponseFormatter.java
@@ -4,7 +4,6 @@ import org.apache.hawq.pxf.api.Fragment;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
index 9497c6c..6a93833 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java
@@ -14,58 +14,81 @@ import org.apache.commons.logging.LogFactory;
 
 import java.io.*;
 import java.nio.charset.CharacterCodingException;
+import java.util.LinkedList;
 import java.util.zip.ZipException;
 
-/*
- * ReadBridge class creates appropriate accessor and resolver.
- * It will then create the correct output conversion
- * class (e.g. Text or GPDBWritable) and get records from accessor,
- * let resolver deserialize them and reserialize them using the
- * output conversion class.
- *
- * The class handles BadRecordException and other exception type
- * and marks the record as invalid for GPDB.
+/**
+ * ReadBridge class creates appropriate accessor and resolver. It will then
+ * create the correct output conversion class (e.g. Text or GPDBWritable) and
+ * get records from accessor, let resolver deserialize them and reserialize them
+ * using the output conversion class. <br>
+ * The class handles BadRecordException and other exception type and marks the
+ * record as invalid for HAWQ.
  */
 public class ReadBridge implements Bridge {
     ReadAccessor fileAccessor = null;
     ReadResolver fieldsResolver = null;
     BridgeOutputBuilder outputBuilder = null;
+    LinkedList<Writable> outputQueue = null;
 
-    private Log Log;
+    private static final Log Log = LogFactory.getLog(ReadBridge.class);
 
-    /*
-     * C'tor - set the implementation of the bridge
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing accessor and resolver names
+     * @throws Exception if accessor or resolver can't be instantiated
      */
     public ReadBridge(ProtocolData protData) throws Exception {
         outputBuilder = new BridgeOutputBuilder(protData);
-        Log = LogFactory.getLog(ReadBridge.class);
+        outputQueue = new LinkedList<Writable>();
         fileAccessor = getFileAccessor(protData);
         fieldsResolver = getFieldsResolver(protData);
     }
 
-    /*
-     * Accesses the underlying HDFS file
+    /**
+     * Accesses the underlying HDFS file.
      */
     @Override
     public boolean beginIteration() throws Exception {
         return fileAccessor.openForRead();
     }
 
-    /*
-     * Fetch next object from file and turn it into a record that the GPDB backend can process
+    /**
+     * Fetches next object from file and turn it into a record that the HAWQ
+     * backend can process.
      */
     @Override
     public Writable getNext() throws Exception {
-        Writable output;
+        Writable output = null;
         OneRow onerow = null;
+
+        if (!outputQueue.isEmpty()) {
+            return outputQueue.pop();
+        }
+
         try {
-            onerow = fileAccessor.readNextObject();
-            if (onerow == null) {
-                fileAccessor.closeForRead();
-                return null;
-            }
+            while (outputQueue.isEmpty()) {
+                onerow = fileAccessor.readNextObject();
+                if (onerow == null) {
+                    fileAccessor.closeForRead();
+                    output = outputBuilder.getPartialLine();
+                    if (output != null) {
+                        Log.warn("A partial record in the end of the fragment");
+                    }
+                    // if there is a partial line, return it now, otherwise it
+                    // will return null
+                    return output;
+                }
 
-            output = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+                // we checked before that outputQueue is empty, so we can
+                // override it.
+                outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
+                if (!outputQueue.isEmpty()) {
+                    output = outputQueue.pop();
+                    break;
+                }
+            }
         } catch (IOException ex) {
             if (!isDataException(ex)) {
                 fileAccessor.closeForRead();
@@ -78,7 +101,8 @@ public class ReadBridge implements Bridge {
                 row_info = onerow.toString();
             }
             if (ex.getCause() != null) {
-                Log.debug("BadRecordException " + ex.getCause().toString() + ": " + row_info);
+                Log.debug("BadRecordException " + ex.getCause().toString()
+                        + ": " + row_info);
             } else {
                 Log.debug(ex.toString() + ": " + row_info);
             }
@@ -91,27 +115,34 @@ public class ReadBridge implements Bridge {
         return output;
     }
 
-    public static ReadAccessor getFileAccessor(InputData inputData) throws Exception {
-        return (ReadAccessor) Utilities.createAnyInstance(InputData.class, inputData.getAccessor(), inputData);
+    public static ReadAccessor getFileAccessor(InputData inputData)
+            throws Exception {
+        return (ReadAccessor) Utilities.createAnyInstance(InputData.class,
+                inputData.getAccessor(), inputData);
     }
 
-    public static ReadResolver getFieldsResolver(InputData inputData) throws Exception {
-        return (ReadResolver) Utilities.createAnyInstance(InputData.class, inputData.getResolver(), inputData);
+    public static ReadResolver getFieldsResolver(InputData inputData)
+            throws Exception {
+        return (ReadResolver) Utilities.createAnyInstance(InputData.class,
+                inputData.getResolver(), inputData);
     }
 
     /*
-     * There are many exceptions that inherit IOException. Some of them like EOFException are generated
-     * due to a data problem, and not because of an IO/connection problem as the father IOException
-     * might lead us to believe. For example, an EOFException will be thrown while fetching a record
-     * from a sequence file, if there is a formatting problem in the record. Fetching record from
-     * the sequence-file is the responsibility of the accessor so the exception will be thrown from the
-     * accessor. We identify this cases by analyzing the exception type, and when we discover that the
-     * actual problem was a data problem, we return the errorOutput GPDBWritable.
+     * There are many exceptions that inherit IOException. Some of them like
+     * EOFException are generated due to a data problem, and not because of an
+     * IO/connection problem as the father IOException might lead us to believe.
+     * For example, an EOFException will be thrown while fetching a record from
+     * a sequence file, if there is a formatting problem in the record. Fetching
+     * record from the sequence-file is the responsibility of the accessor so
+     * the exception will be thrown from the accessor. We identify this cases by
+     * analyzing the exception type, and when we discover that the actual
+     * problem was a data problem, we return the errorOutput GPDBWritable.
      */
     private boolean isDataException(IOException ex) {
-        return (ex instanceof EOFException || ex instanceof CharacterCodingException ||
-                ex instanceof CharConversionException || ex instanceof UTFDataFormatException ||
-                ex instanceof ZipException);
+        return (ex instanceof EOFException
+                || ex instanceof CharacterCodingException
+                || ex instanceof CharConversionException
+                || ex instanceof UTFDataFormatException || ex instanceof ZipException);
     }
 
     @Override
@@ -121,7 +152,8 @@ public class ReadBridge implements Bridge {
 
     @Override
     public boolean isThreadSafe() {
-        boolean result = ((Plugin) fileAccessor).isThreadSafe() && ((Plugin) fieldsResolver).isThreadSafe();
+        boolean result = ((Plugin) fileAccessor).isThreadSafe()
+                && ((Plugin) fieldsResolver).isThreadSafe();
         Log.debug("Bridge is " + (result ? "" : "not ") + "thread safe");
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
new file mode 100644
index 0000000..66ee053
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java
@@ -0,0 +1,112 @@
+package org.apache.hawq.pxf.service;
+
+import java.io.DataInputStream;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.service.io.Writable;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+
+/**
+ * ReadSamplingBridge wraps a ReadBridge, and returns only some of the output
+ * records, based on a ratio sample. The sample to pass or discard a record is
+ * done after all of the processing is completed (
+ * {@code accessor -> resolver -> output builder}) to make sure there are no
+ * chunks of data instead of single records. <br>
+ * The goal is to get as uniform as possible sampling. This is achieved by
+ * creating a bit map matching the precision of the sampleRatio, so that for a
+ * ratio of 0.034, a bit-map of 1000 bits will be created, and 34 bits will be
+ * set. This map is matched against each read record, discarding ones with a 0
+ * bit and continuing until a 1 bit record is read.
+ */
+public class ReadSamplingBridge implements Bridge {
+
+    ReadBridge bridge;
+
+    float sampleRatio;
+    BitSet sampleBitSet;
+    int bitSetSize;
+    int sampleSize;
+    int curIndex;
+
+    static private Log Log = LogFactory.getLog(ReadSamplingBridge.class);;
+
+    /**
+     * C'tor - set the implementation of the bridge.
+     *
+     * @param protData input containing sampling ratio
+     * @throws Exception if the sampling ratio is wrong
+     */
+    public ReadSamplingBridge(ProtocolData protData) throws Exception {
+        bridge = new ReadBridge(protData);
+
+        this.sampleRatio = protData.getStatsSampleRatio();
+        if (sampleRatio < 0.0001 || sampleRatio > 1.0) {
+            throw new IllegalArgumentException(
+                    "sampling ratio must be a value between 0.0001 and 1.0. "
+                            + "(value = " + sampleRatio + ")");
+        }
+
+        calculateBitSetSize();
+
+        this.sampleBitSet = AnalyzeUtils.generateSamplingBitSet(bitSetSize,
+                sampleSize);
+        this.curIndex = 0;
+    }
+
+    private void calculateBitSetSize() {
+
+        sampleSize = (int) (sampleRatio * 10000);
+        bitSetSize = 10000;
+
+        while ((bitSetSize > 100) && (sampleSize % 10 == 0)) {
+            bitSetSize /= 10;
+            sampleSize /= 10;
+        }
+        Log.debug("bit set size = " + bitSetSize + " sample size = "
+                + sampleSize);
+    }
+
+    /**
+     * Fetches next sample, according to the sampling ratio.
+     */
+    @Override
+    public Writable getNext() throws Exception {
+        Writable output = bridge.getNext();
+
+        // sample - if bit is false, advance to the next object
+        while (!sampleBitSet.get(curIndex)) {
+
+            if (output == null) {
+                break;
+            }
+            incIndex();
+            output = bridge.getNext();
+        }
+
+        incIndex();
+        return output;
+    }
+
+    private void incIndex() {
+        curIndex = (++curIndex) % bitSetSize;
+    }
+
+    @Override
+    public boolean beginIteration() throws Exception {
+        return bridge.beginIteration();
+    }
+
+    @Override
+    public boolean setNext(DataInputStream inputStream) throws Exception {
+        return bridge.setNext(inputStream);
+    }
+
+    @Override
+    public boolean isThreadSafe() {
+        return bridge.isThreadSafe();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
index 34ed316..c96c17a 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java
@@ -27,17 +27,18 @@ public class WriteBridge implements Bridge {
      * C'tor - set the implementation of the bridge
      */
     public WriteBridge(ProtocolData protocolData) throws Exception {
-    	
-        inputBuilder = new BridgeInputBuilder(protocolData);        
-        /* plugins accept InputData paramaters */
+
+        inputBuilder = new BridgeInputBuilder(protocolData);
+        /* plugins accept InputData parameters */
         fileAccessor = getFileAccessor(protocolData);
         fieldsResolver = getFieldsResolver(protocolData);
-        
+
     }
 
     /*
      * Accesses the underlying HDFS file
      */
+    @Override
     public boolean beginIteration() throws Exception {
         return fileAccessor.openForWrite();
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
index e74c88b..afe7917 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/io/BufferWritable.java
@@ -54,4 +54,25 @@ public class BufferWritable implements Writable {
                 "BufferWritable.readFields() is not implemented");
     }
 
+    /**
+     * Appends given app's buffer to existing buffer.
+     * <br>
+     * Not efficient - requires copying both this and the appended buffer.
+     *
+     * @param app buffer to append
+     */
+    public void append(byte[] app) {
+        if (buf == null) {
+            buf = app;
+            return;
+        }
+        if (app == null) {
+            return;
+        }
+
+        byte[] newbuf = new byte[buf.length + app.length];
+        System.arraycopy(buf, 0, newbuf, 0, buf.length);
+        System.arraycopy(app, 0, newbuf, buf.length, app.length);
+        buf = newbuf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
index 588845d..cda8317 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 
 import org.apache.hawq.pxf.service.Bridge;
 import org.apache.hawq.pxf.service.ReadBridge;
+import org.apache.hawq.pxf.service.ReadSamplingBridge;
 import org.apache.hawq.pxf.service.io.Writable;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
 import org.apache.hawq.pxf.service.utilities.SecuredHDFS;
@@ -36,28 +37,32 @@ public class BridgeResource extends RestResource {
 
     private static Log Log = LogFactory.getLog(BridgeResource.class);
     /**
-     * Lock is needed here in the case of a non-thread-safe plugin.
-     * Using synchronized methods is not enough because the bridge work
-     * is called by jetty ({@link StreamingOutput}), after we are getting
-     * out of this class's context.
+     * Lock is needed here in the case of a non-thread-safe plugin. Using
+     * synchronized methods is not enough because the bridge work is called by
+     * jetty ({@link StreamingOutput}), after we are getting out of this class's
+     * context.
      * <p/>
-     * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on the
-     * isThreadSafe parameter that is determined by the bridge.
+     * BRIDGE_LOCK is accessed through lock() and unlock() functions, based on
+     * the isThreadSafe parameter that is determined by the bridge.
      */
     private static final ReentrantLock BRIDGE_LOCK = new ReentrantLock();
 
     public BridgeResource() {
     }
 
-    /*
-     * Used to be HDFSReader. Creates a bridge instance and iterates over
-     * its records, printing it out to outgoing stream.
-     * Outputs GPDBWritable.
+    /**
+     * Used to be HDFSReader. Creates a bridge instance and iterates over its
+     * records, printing it out to outgoing stream. Outputs GPDBWritable or
+     * Text.
      *
      * Parameters come through HTTP header.
      *
-     * @param servletContext Servlet context contains attributes required by SecuredHDFS
+     * @param servletContext Servlet context contains attributes required by
+     *            SecuredHDFS
      * @param headers Holds HTTP headers from request
+     * @return response object containing stream that will output records
+     * @throws Exception in case of wrong request parameters, or failure to
+     *             initialize bridge
      */
     @GET
     @Produces(MediaType.APPLICATION_OCTET_STREAM)
@@ -70,17 +75,24 @@ public class BridgeResource extends RestResource {
 
         ProtocolData protData = new ProtocolData(params);
         SecuredHDFS.verifyToken(protData, servletContext);
-        Bridge bridge = new ReadBridge(protData);
+        Bridge bridge;
+        float sampleRatio = protData.getStatsSampleRatio();
+        if (sampleRatio > 0) {
+            bridge = new ReadSamplingBridge(protData);
+        } else {
+            bridge = new ReadBridge(protData);
+        }
         String dataDir = protData.getDataSource();
         // THREAD-SAFE parameter has precedence
         boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe();
-        Log.debug("Request for " + dataDir + " will be handled " +
-                (isThreadSafe ? "without" : "with") + " synchronization");
+        Log.debug("Request for " + dataDir + " will be handled "
+                + (isThreadSafe ? "without" : "with") + " synchronization");
 
         return readResponse(bridge, protData, isThreadSafe);
     }
 
-    Response readResponse(final Bridge bridge, ProtocolData protData, final boolean threadSafe) throws Exception {
+    Response readResponse(final Bridge bridge, ProtocolData protData,
+                          final boolean threadSafe) {
         final int fragment = protData.getDataFragment();
         final String dataDir = protData.getDataSource();
 
@@ -89,7 +101,8 @@ public class BridgeResource extends RestResource {
         // output stream
         final StreamingOutput streaming = new StreamingOutput() {
             @Override
-            public void write(final OutputStream out) throws IOException, WebApplicationException {
+            public void write(final OutputStream out) throws IOException,
+                    WebApplicationException {
                 long recordCount = 0;
 
                 if (!threadSafe) {
@@ -103,20 +116,26 @@ public class BridgeResource extends RestResource {
 
                     Writable record;
                     DataOutputStream dos = new DataOutputStream(out);
-                    Log.debug("Starting streaming fragment " + fragment + " of resource " + dataDir);
+                    Log.debug("Starting streaming fragment " + fragment
+                            + " of resource " + dataDir);
                     while ((record = bridge.getNext()) != null) {
-						record.write(dos);
+                        record.write(dos);
                         ++recordCount;
                     }
-                    Log.debug("Finished streaming fragment " + fragment + " of resource " + dataDir + ", " + recordCount + " records.");
+                    Log.debug("Finished streaming fragment " + fragment
+                            + " of resource " + dataDir + ", " + recordCount
+                            + " records.");
                 } catch (ClientAbortException e) {
-                    // Occurs whenever client (HAWQ) decides the end the connection
+                    // Occurs whenever client (HAWQ) decides the end the
+                    // connection
                     Log.error("Remote connection closed by HAWQ", e);
                 } catch (Exception e) {
                     Log.error("Exception thrown when streaming", e);
                     throw new IOException(e.getMessage());
                 } finally {
-                    Log.debug("Stopped streaming fragment " + fragment + " of resource " + dataDir + ", " + recordCount + " records.");
+                    Log.debug("Stopped streaming fragment " + fragment
+                            + " of resource " + dataDir + ", " + recordCount
+                            + " records.");
                     if (!threadSafe) {
                         unlock(dataDir);
                     }
@@ -128,7 +147,7 @@ public class BridgeResource extends RestResource {
     }
 
     /**
-     * Lock BRIDGE_LOCK
+     * Locks BRIDGE_LOCK
      *
      * @param path path for the request, used for logging.
      */
@@ -139,7 +158,7 @@ public class BridgeResource extends RestResource {
     }
 
     /**
-     * Unlock BRIDGE_LOCK
+     * Unlocks BRIDGE_LOCK
      *
      * @param path path for the request, used for logging.
      */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
index 258f8c2..6f77813 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/FragmenterResource.java
@@ -2,9 +2,11 @@ package org.apache.hawq.pxf.service.rest;
 
 import org.apache.hawq.pxf.api.Fragment;
 import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.FragmentsStats;
 import org.apache.hawq.pxf.service.FragmenterFactory;
 import org.apache.hawq.pxf.service.FragmentsResponse;
 import org.apache.hawq.pxf.service.FragmentsResponseFormatter;
+import org.apache.hawq.pxf.service.utilities.AnalyzeUtils;
 import org.apache.hawq.pxf.service.utilities.ProtocolData;
 import org.apache.hawq.pxf.service.utilities.SecuredHDFS;
 
@@ -21,44 +23,99 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-/*
- * Class enhances the API of the WEBHDFS REST server.
- * Returns the data fragments that a data resource is made of, enabling parallel processing of the data resource.
- * Example for querying API FRAGMENTER from a web client
- * curl -i "http://localhost:50070/pxf/v2/Fragmenter/getFragments?path=/dir1/dir2/*txt"
- * /pxf/ is made part of the path when there is a webapp by that name in tcServer.
+/**
+ * Class enhances the API of the WEBHDFS REST server. Returns the data fragments
+ * that a data resource is made of, enabling parallel processing of the data
+ * resource. Example for querying API FRAGMENTER from a web client
+ * {@code curl -i "http://localhost:50070/pxf/v2/Fragmenter/getFragments?path=/dir1/dir2/*txt"}
+ * <code>/pxf/</code> is made part of the path when there is a webapp by that
+ * name in tomcat.
  */
 @Path("/" + Version.PXF_PROTOCOL_VERSION + "/Fragmenter/")
 public class FragmenterResource extends RestResource {
-    private Log Log;
+    private static Log Log = LogFactory.getLog(FragmenterResource.class);
 
-    public FragmenterResource() throws IOException {
-        Log = LogFactory.getLog(FragmenterResource.class);
-    }
-
-    /*
-     * The function is called when http://nn:port/pxf/vx/Fragmenter/getFragments?path=...
-     * is used
+    /**
+     * The function is called when
+     * {@code http://nn:port/pxf/vx/Fragmenter/getFragments?path=...} is used.
      *
-     * @param servletContext Servlet context contains attributes required by SecuredHDFS
+     * @param servletContext Servlet context contains attributes required by
+     *            SecuredHDFS
      * @param headers Holds HTTP headers from request
      * @param path Holds URI path option used in this request
+     * @return response object with JSON serialized fragments metadata
+     * @throws Exception if getting fragments info failed
      */
     @GET
     @Path("getFragments")
     @Produces("application/json")
     public Response getFragments(@Context final ServletContext servletContext,
-            @Context final HttpHeaders headers,
-            @QueryParam("path") final String path) throws Exception {
+                                 @Context final HttpHeaders headers,
+                                 @QueryParam("path") final String path)
+            throws Exception {
+
+        ProtocolData protData = getProtocolData(servletContext, headers, path);
+
+        /* Create a fragmenter instance with API level parameters */
+        final Fragmenter fragmenter = FragmenterFactory.create(protData);
+
+        List<Fragment> fragments = fragmenter.getFragments();
+
+        fragments = AnalyzeUtils.getSampleFragments(fragments, protData);
+
+        FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(
+                fragments, path);
+
+        return Response.ok(fragmentsResponse, MediaType.APPLICATION_JSON_TYPE).build();
+    }
+
+    /**
+     * The function is called when
+     * {@code http://nn:port/pxf/vx/Fragmenter/getFragmentsStats?path=...} is
+     * used.
+     *
+     * @param servletContext Servlet context contains attributes required by
+     *            SecuredHDFS
+     * @param headers Holds HTTP headers from request
+     * @param path Holds URI path option used in this request
+     * @return response object with JSON serialized fragments statistics
+     * @throws Exception if getting fragments info failed
+     */
+    @GET
+    @Path("getFragmentsStats")
+    @Produces("application/json")
+    public Response getFragmentsStats(@Context final ServletContext servletContext,
+                                      @Context final HttpHeaders headers,
+                                      @QueryParam("path") final String path)
+            throws Exception {
+
+        ProtocolData protData = getProtocolData(servletContext, headers, path);
+
+        /* Create a fragmenter instance with API level parameters */
+        final Fragmenter fragmenter = FragmenterFactory.create(protData);
+
+        FragmentsStats fragmentsStats = fragmenter.getFragmentsStats();
+        String response = FragmentsStats.dataToJSON(fragmentsStats);
+        if (Log.isDebugEnabled()) {
+            Log.debug(FragmentsStats.dataToString(fragmentsStats, path));
+        }
+
+        return Response.ok(response, MediaType.APPLICATION_JSON_TYPE).build();
+    }
+
+    private ProtocolData getProtocolData(final ServletContext servletContext,
+                                         final HttpHeaders headers,
+                                         final String path) throws Exception {
 
         if (Log.isDebugEnabled()) {
-            StringBuilder startMsg = new StringBuilder("FRAGMENTER started for path \"" + path + "\"");
+            StringBuilder startMsg = new StringBuilder(
+                    "FRAGMENTER started for path \"" + path + "\"");
             for (String header : headers.getRequestHeaders().keySet()) {
-                startMsg.append(" Header: ").append(header).append(" Value: ").append(headers.getRequestHeader(header));
+                startMsg.append(" Header: ").append(header).append(" Value: ").append(
+                        headers.getRequestHeader(header));
             }
             Log.debug(startMsg);
         }
@@ -73,12 +130,6 @@ public class FragmenterResource extends RestResource {
         }
         SecuredHDFS.verifyToken(protData, servletContext);
 
-        /* Create a fragmenter instance with API level parameters */
-        final Fragmenter fragmenter = FragmenterFactory.create(protData);
-
-        List<Fragment> fragments = fragmenter.getFragments();
-        FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(fragments, path);
-
-        return Response.ok(fragmentsResponse, MediaType.APPLICATION_JSON_TYPE).build();
+        return protData;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
index 2316bc7..c1dadd1 100644
--- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/InvalidPathResource.java
@@ -12,11 +12,9 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.UriInfo;
-import java.io.IOException;
-
 
 class Version {
-    final static String PXF_PROTOCOL_VERSION = "v13";
+    final static String PXF_PROTOCOL_VERSION = "v14";
 }
 
 /**
@@ -38,7 +36,7 @@ public class InvalidPathResource {
 
     private Log Log;
 
-    public InvalidPathResource() throws IOException {
+    public InvalidPathResource() {
         super();
         Log = LogFactory.getLog(InvalidPathResource.class);
     }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/81385f09/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java
new file mode 100644
index 0000000..7e0fcf1
--- /dev/null
+++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java
@@ -0,0 +1,128 @@
+package org.apache.hawq.pxf.service.utilities;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hawq.pxf.api.Fragment;
+
+/**
+ * Helper class to get statistics for ANALYZE.
+ */
+public class AnalyzeUtils {
+
+    private static Log Log = LogFactory.getLog(AnalyzeUtils.class);
+
+    /**
+     * In case pxf_max_fragments parameter is declared, make sure not to get
+     * over the limit. The returned fragments are evenly distributed, in order
+     * to achieve good sampling.
+     *
+     * @param fragments fragments list
+     * @param protData container for parameters, including sampling data.
+     * @return a list of fragments no bigger than pxf_max_fragments parameter.
+     */
+    static public List<Fragment> getSampleFragments(List<Fragment> fragments,
+                                                    ProtocolData protData) {
+
+        int listSize = fragments.size();
+        int maxSize = protData.getStatsMaxFragments();
+        List<Fragment> samplingList = new ArrayList<Fragment>();
+        BitSet bitSet;
+
+        if (maxSize == 0) {
+            return fragments;
+        }
+
+        Log.debug("fragments list has " + listSize
+                + " fragments, maxFragments = " + maxSize);
+
+        bitSet = generateSamplingBitSet(listSize, maxSize);
+
+        for (int i = 0; i < listSize; ++i) {
+            if (bitSet.get(i)) {
+                samplingList.add(fragments.get(i));
+            }
+        }
+
+        return samplingList;
+    }
+
+    /**
+     * Marks sampleSize bits out of the poolSize, in a uniform way.
+     *
+     * @param poolSize pool size
+     * @param sampleSize sample size
+     * @return bit set with sampleSize bits set out of poolSize.
+     */
+    static public BitSet generateSamplingBitSet(int poolSize, int sampleSize) {
+
+        int skip = 0, chosen = 0, curIndex = 0;
+        BitSet bitSet = new BitSet();
+
+        if (poolSize <= 0 || sampleSize <= 0) {
+            return bitSet;
+        }
+
+        if (sampleSize >= poolSize) {
+            Log.debug("sampling bit map has " + poolSize + " elements (100%)");
+            bitSet.set(0, poolSize);
+            return bitSet;
+        }
+
+        skip = (poolSize / sampleSize) + 1;
+
+        while (chosen < sampleSize) {
+
+            bitSet.set(curIndex);
+            chosen++;
+            if (chosen == sampleSize) {
+                break;
+            }
+
+            for (int i = 0; i < skip; ++i) {
+                curIndex = nextClearBitModulo((++curIndex) % poolSize,
+                        poolSize, bitSet);
+                if (curIndex == -1) {
+                    // should never happen
+                    throw new IllegalArgumentException(
+                            "Trying to sample more than pool size "
+                                    + "(pool size " + poolSize
+                                    + ", sampling size " + sampleSize);
+                }
+            }
+        }
+
+        Log.debug("sampling bit map has " + chosen + " elements:"
+                + bitSet.toString());
+
+        return bitSet;
+    }
+
+    /**
+     * Returns index of next clear (false) bit, starting from and including
+     * index. If all bits from index to the end are set (true), search from the
+     * beginning. Return -1 if all bits are set (true).
+     *
+     * @param index starting point
+     * @param poolSize the bit set size
+     * @param bitSet bitset to search
+     * @return index of next clear bit, starting in index
+     */
+    static private int nextClearBitModulo(int index, int poolSize, BitSet bitSet) {
+
+        int indexToSet = bitSet.nextClearBit(index);
+        if (indexToSet == poolSize && index != 0) {
+            indexToSet = bitSet.nextClearBit(0);
+        }
+        /* means that all bits are already set, so we return -1 */
+        if (indexToSet == poolSize) {
+            return -1;
+        }
+
+        return indexToSet;
+    }
+}


Mime
View raw message