hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nh...@apache.org
Subject [4/5] incubator-hawq git commit: HAWQ-28. JavaDoc fixes for PXF
Date Thu, 08 Oct 2015 18:37:27 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
index 48fb5b5..13a3546 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkRecordReader.java
@@ -1,22 +1,20 @@
 package com.pivotal.pxf.plugins.hdfs;
 
-import com.pivotal.pxf.plugins.hdfs.ChunkWritable;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
+import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.lang.IllegalArgumentException;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
 import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -25,241 +23,259 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.Log;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
-import static org.apache.hadoop.mapreduce.lib.input.LineRecordReader.MAX_LINE_LENGTH;
-
 
 /**
- * ChunkRecordReader is designed for fast reading of a file split. The idea is to bring chunks of 
- * data instead of single records. The chunks contain many records and the chunk end is not aligned
- * on a record boundary. The size of the chunk is a class hardcoded parameter - CHUNK_SIZE.
- * This behaviour sets this reader apart from the other readers which will fetch one record and 
- * stop when reaching a record delimiter.
+ * ChunkRecordReader is designed for fast reading of a file split. The idea is
+ * to bring chunks of data instead of single records. The chunks contain many
+ * records and the chunk end is not aligned on a record boundary. The size of
+ * the chunk is a class hardcoded parameter - CHUNK_SIZE. This behaviour sets
+ * this reader apart from the other readers which will fetch one record and stop
+ * when reaching a record delimiter.
  */
-public class ChunkRecordReader implements RecordReader<LongWritable, ChunkWritable> {
-	private static final Log LOG
-	= LogFactory.getLog(ChunkRecordReader.class.getName());
+public class ChunkRecordReader implements
+        RecordReader<LongWritable, ChunkWritable> {
+    private static final Log LOG = LogFactory.getLog(ChunkRecordReader.class.getName());
+
+    private CompressionCodecFactory compressionCodecs = null;
+    private long start;
+    private long pos;
+    private long end;
+    private long fileLength;
+    private ChunkReader in;
+    private FSDataInputStream fileIn;
+    private final Seekable filePosition;
+    private int maxLineLength;
+    private CompressionCodec codec;
+    private Decompressor decompressor;
+    private static final int CHUNK_SIZE = 1024 * 1024;
+
+    /**
+     * Translates the FSDataInputStream into a DFSInputStream.
+     */
+    private DFSInputStream getInputStream() {
+        return (DFSInputStream) (fileIn.getWrappedStream());
+    }
+
+    /**
+     * Returns statistics of the input stream's read operation: total bytes
+     * read, bytes read locally, bytes read in short-circuit (directly from file
+     * descriptor).
+     *
+     * @return an instance of ReadStatistics class
+     */
+    public ReadStatistics getReadStatistics() {
+        return getInputStream().getReadStatistics();
+    }
+
+    /**
+     * Constructs a ChunkRecordReader instance.
+     *
+     * @param job the job configuration
+     * @param split contains the file name, begin byte of the split and the
+     *            bytes length
+     * @throws IOException if an I/O error occurs when accessing the file or
+     *             creating input stream to read from it
+     */
+    public ChunkRecordReader(Configuration job, FileSplit split)
+            throws IOException {
+        maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
+        validateLength(maxLineLength);
+        start = split.getStart();
+        end = start + split.getLength();
+        final Path file = split.getPath();
+        compressionCodecs = new CompressionCodecFactory(job);
+        codec = compressionCodecs.getCodec(file);
+
+        // open the file and seek to the start of the split
+        job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+        final FileSystem fs = file.getFileSystem(job);
+        fs.setVerifyChecksum(false);
+        fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
+        fileLength = getInputStream().getFileLength();
+        if (isCompressedInput()) {
+            decompressor = CodecPool.getDecompressor(codec);
+            if (codec instanceof SplittableCompressionCodec) {
+                final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+                        fileIn, decompressor, start, end,
+                        SplittableCompressionCodec.READ_MODE.BYBLOCK);
+                in = new ChunkReader(cIn);
+                start = cIn.getAdjustedStart();
+                end = cIn.getAdjustedEnd();
+                filePosition = cIn; // take pos from compressed stream
+            } else {
+                in = new ChunkReader(codec.createInputStream(fileIn,
+                        decompressor));
+                filePosition = fileIn;
+            }
+        } else {
+            fileIn.seek(start);
+            in = new ChunkReader(fileIn);
+            filePosition = fileIn;
+        }
+        /*
+         * If this is not the first split, we always throw away first record
+         * because we always (except the last split) read one extra line in
+         * next() method.
+         */
+        if (start != 0) {
+            start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
+        }
+        this.pos = start;
+    }
+
+    /**
+     * Used by the client of this class to create the 'key' output parameter for
+     * next() method.
+     *
+     * @return an instance of LongWritable
+     */
+    @Override
+    public LongWritable createKey() {
+        return new LongWritable();
+    }
+
+    /**
+     * Used by the client of this class to create the 'value' output parameter
+     * for next() method.
+     *
+     * @return an instance of ChunkWritable
+     */
+    @Override
+    public ChunkWritable createValue() {
+        return new ChunkWritable();
+    }
+
+    /**
+     * Fetches the next data chunk from the file split. The size of the chunk is
+     * a class hardcoded parameter - CHUNK_SIZE. This behaviour sets this reader
+     * apart from the other readers which will fetch one record and stop when
+     * reaching a record delimiter.
+     *
+     * @param key - output parameter. When method returns will contain the key -
+     *            the number of the start byte of the chunk
+     * @param value - output parameter. When method returns will contain the
+     *            value - the chunk, a byte array inside the ChunkWritable
+     *            instance
+     * @return false - when end of split was reached
+     * @throws IOException if an I/O error occurred while reading the next chunk
+     *             or line
+     */
+    @Override
+    public synchronized boolean next(LongWritable key, ChunkWritable value)
+            throws IOException {
+        /*
+         * Usually a record is spread between the end of current split and the
+         * beginning of next split. So when reading the last record in the split
+         * we usually need to cross over to the next split. This tricky logic is
+         * implemented in ChunkReader.readLine(). In order not to rewrite this
+         * logic we will read the lust chunk in the split with readLine(). For a
+         * split of 120M, reading the last 1M line by line doesn't have a huge
+         * impact. Applying a factor to the last chunk to make sure we start
+         * before the last record.
+         */
+        float factor = 1.5f;
+        int limit = (int) (factor * CHUNK_SIZE);
+        long curPos = getFilePosition();
+        int newSize = 0;
+
+        while (curPos <= end) {
+            key.set(pos);
+
+            if ((end - curPos) > limit) {
+                newSize = in.readChunk(value, CHUNK_SIZE);
+            } else {
+                newSize = in.readLine(value,
+                        Math.max(maxBytesToConsume(pos), maxLineLength));
+            }
+            if (newSize == 0) {
+                break;
+            }
+
+            pos += newSize;
+
+            if (pos == fileLength) { /*
+                                      * in case text file last character is not
+                                      * a linefeed
+                                      */
+                if (value.box[value.box.length - 1] != '\n') {
+                    int newLen = value.box.length + 1;
+                    byte[] tmp = new byte[newLen];
+                    System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
+                    tmp[newLen - 1] = '\n';
+                    value.box = tmp;
+                }
+            }
+
+            return true;
+        }
+        /*
+         * if we got here, either newSize was 0 or curPos is bigger than end
+         */
 
-	private CompressionCodecFactory compressionCodecs = null;
-	private long start;
-	private long pos;
-	private long end;
-	private long fileLength;
-	private ChunkReader in;
-	private FSDataInputStream fileIn;
-	private final Seekable filePosition;
-	private int maxLineLength;
-	private CompressionCodec codec;
-	private Decompressor decompressor;
-	private static final int CHUNK_SIZE = 1024 * 1024;
+        return false;
+    }
 
-	/*
-	 * Translate the FSDataInputStream into a DFSInputStream
-	 */
-	private DFSInputStream getInputStream() {
-		return (DFSInputStream)(fileIn.getWrappedStream());
-	}
-	
-	/**
-	 * Returns statistics of the input stream's read operation: total bytes read,
-	 * bytes read localy, bytes read in short-circuit (directly from file descriptor)
-	 * @return an instance of ReadStatistics class
-	 */
-	public ReadStatistics getReadStatistics() {
-		return getInputStream().getReadStatistics();
-	}
+    /**
+     * Gets the progress within the split.
+     */
+    @Override
+    public synchronized float getProgress() throws IOException {
+        if (start == end) {
+            return 0.0f;
+        } else {
+            return Math.min(1.0f, (getFilePosition() - start)
+                    / (float) (end - start));
+        }
+    }
 
-	/**
-	 * Constructs a ChunkRecordReader instance
-	 * @param job the job configuration
-	 * @param split contains the file name, begin byte of the split and the bytes length
-	 */
-	public ChunkRecordReader(Configuration job, FileSplit split) throws IOException {
-		maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
-		validateLength(maxLineLength);
-		start = split.getStart();
-		end = start + split.getLength();
-		final Path file = split.getPath();
-		compressionCodecs = new CompressionCodecFactory(job);
-		codec = compressionCodecs.getCodec(file);
+    /**
+     * Returns the position of the unread tail of the file
+     *
+     * @return pos - start byte of the unread tail of the file
+     */
+    @Override
+    public synchronized long getPos() throws IOException {
+        return pos;
+    }
 
-		// open the file and seek to the start of the split
-		job.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
-		final FileSystem fs = file.getFileSystem(job);
-		fs.setVerifyChecksum(false);
-		fileIn = fs.open(file, ChunkReader.DEFAULT_BUFFER_SIZE);
-		fileLength = getInputStream().getFileLength();
-		if (isCompressedInput()) {
-			decompressor = CodecPool.getDecompressor(codec);
-			if (codec instanceof SplittableCompressionCodec) {
-				final SplitCompressionInputStream cIn =
-					((SplittableCompressionCodec)codec).createInputStream(
-						fileIn, decompressor, start, end,
-						SplittableCompressionCodec.READ_MODE.BYBLOCK);
-				in = new ChunkReader(cIn);
-				start = cIn.getAdjustedStart();
-				end = cIn.getAdjustedEnd();
-				filePosition = cIn; // take pos from compressed stream
-			} else {
-				in = new ChunkReader(codec.createInputStream(fileIn, decompressor));
-				filePosition = fileIn;
-			}
-		} else {
-			fileIn.seek(start);
-			in = new ChunkReader(fileIn);
-			filePosition = fileIn;
-		}
-		/*
-		 * If this is not the first split, we always throw away first record
-		 * because we always (except the last split) read one extra line in
-		 * next() method.
-		 */
-		if (start != 0) {
-		  start += in.readLine(new ChunkWritable(), maxBytesToConsume(start));
-		}
-		this.pos = start;
-	}
-	
-	/**
-	 * Used by the client of this class to create the 'key' output parameter for next() method
-	 * @return an instance of LongWritable
-	 */
-	@Override
-	public LongWritable createKey() {
-		return new LongWritable();
-	}
-	
-	/**
-	 * Used by the client of this class to create the 'value' output parameter for next() method
-	 * @return an instance of ChunkWritable
-	 */	
-	@Override
-	public ChunkWritable createValue() {
-		return new ChunkWritable();
-	}
-	
-	/**
-	 * Fetch the next data chunk from the file split. The size of the chunk is a class hardcoded
-	 * parameter - CHUNK_SIZE. This behaviour sets this reader apart from the other readers which 
-	 * will fetch one record and stop when reaching a record delimiter.
-	 * @param key - output parameter. When method returns will contain the key - the number 
-	 *              of the start byte of the chunk
-	 * @param value - output parameter. When method returns will contain the value - the chunk,
-	 *                a byte array inside the ChunkWritable instance
-	 * @return false - when end of split was reached
-	 */
-	@Override
-	public synchronized boolean next(LongWritable key, ChunkWritable value)
-	throws IOException {
-		/*
-		 * Usualy a record is spread between the end of current split and the beginning
-		 * of next split. So when reading the last record in the split we usually need to
-		 * cross over to the next split. This tricky logic is implemented in 
-		 * ChunkReader.readLine(). 
-		 * In order not to rewrite this logic we will read the lust chunk in the split
-		 * with readLine(). For a split of 120M, reading the last 1M line by line doesn't 
-		 * have a huge impact. Applying a factor to the last chunk to make sure we start
-		 * before the last record.
-		 */
-		float factor = 1.5f; 
-		int limit = (int)(factor*CHUNK_SIZE);
-		long curPos = getFilePosition();
-		int newSize = 0;
-		
-		while (curPos <= end) {
-			key.set(pos);
-			
-			if ((end - curPos) > limit) {
-				newSize = in.readChunk(value, CHUNK_SIZE);
-			}
-			else {
-				newSize = in.readLine(value, Math.max(maxBytesToConsume(pos), maxLineLength));
-			}
-			if (newSize == 0) {
-				break;
-			}
-			
-			pos += newSize;
-			
-			if (pos == fileLength) { /* in case text file last character is not a linefeed*/
-				if (value.box[value.box.length - 1] != (int)'\n') {
-					int newLen = value.box.length + 1; 
-					byte [] tmp = new byte [newLen];
-					System.arraycopy(value.box, 0, tmp, 0, newLen - 1);
-					tmp[newLen - 1] = '\n';
-					value.box = tmp;
-				}
-			}
-			
-			return true;
-		}
-		/*
-		 * if we got here, either newSize was 0 or curPos is bigger than end
-		 */
-				
-		return false;
-	}
-	
-	/**
-	 * Get the progress within the split
-	 */
-	@Override
-	public synchronized float getProgress() throws IOException {
-		if (start == end) {
-			return 0.0f;
-		} else {
-			return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
-		}
-	}
-    
-	/**
-	 * @return pos - start byte of the unread tail of the file
-	 */
-	public  synchronized long getPos() throws IOException {
-		return pos;
-	}
-	
-	/**
-	 * Close the input stream
-	 */
-	@Override
-	public synchronized void close() throws IOException {
-		try {
-			if (in != null) {
-				in.close();
-			}
-		} finally {
-			if (decompressor != null) {
-				CodecPool.returnDecompressor(decompressor);
-			}
-		}
-	}
+    /**
+     * Closes the input stream.
+     */
+    @Override
+    public synchronized void close() throws IOException {
+        try {
+            if (in != null) {
+                in.close();
+            }
+        } finally {
+            if (decompressor != null) {
+                CodecPool.returnDecompressor(decompressor);
+            }
+        }
+    }
 
-	private void validateLength(int maxLineLength) {
-		if (maxLineLength <= 0)
-			throw new IllegalArgumentException("maxLineLength must be a positive value");
-	}
+    private void validateLength(int maxLineLength) {
+        if (maxLineLength <= 0)
+            throw new IllegalArgumentException(
+                    "maxLineLength must be a positive value");
+    }
 
-	private boolean isCompressedInput() {
-		return (codec != null);
-	}
+    private boolean isCompressedInput() {
+        return (codec != null);
+    }
 
-	private int maxBytesToConsume(long pos) {
-		return isCompressedInput()
-			? Integer.MAX_VALUE
-			: (int) Math.min(Integer.MAX_VALUE, end - pos);
-	}
+    private int maxBytesToConsume(long pos) {
+        return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
+                Integer.MAX_VALUE, end - pos);
+    }
 
-	private long getFilePosition() throws IOException {
-		long retVal;
-		if (isCompressedInput() && null != filePosition) {
-			retVal = filePosition.getPos();
-		} else {
-			retVal = pos;
-		}
-		return retVal;
-	}
+    private long getFilePosition() throws IOException {
+        long retVal;
+        if (isCompressedInput() && null != filePosition) {
+            retVal = filePosition.getPos();
+        } else {
+            retVal = pos;
+        }
+        return retVal;
+    }
 } // class ChunkRecordReader

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
index b19b3fd..a0a8b17 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/ChunkWritable.java
@@ -2,7 +2,6 @@ package com.pivotal.pxf.plugins.hdfs;
 
 import java.io.DataOutput;
 import java.io.DataInput;
-import java.io.IOException;
 import java.lang.UnsupportedOperationException;
 
 import org.apache.hadoop.io.Writable;
@@ -13,29 +12,28 @@ import org.apache.hadoop.io.Writable;
  */
 public class ChunkWritable implements Writable {
 	public byte [] box;
-	
+
 	/**
-     * Serialize the fields of this object to <code>out</code>.
+     * Serializes the fields of this object to <code>out</code>.
      *
      * @param out <code>DataOutput</code> to serialize this object into.
-     * @throws IOException
+     * @throws UnsupportedOperationException this function is not supported
      */
 	@Override
     public void write(DataOutput out)  {
 		throw new UnsupportedOperationException("ChunkWritable.write() is not implemented");
     }
-	
+
     /**
-     * Deserialize the fields of this object from <code>in</code>.
+     * Deserializes the fields of this object from <code>in</code>.
      * <p>For efficiency, implementations should attempt to re-use storage in the
      * existing object where possible.</p>
      *
      * @param in <code>DataInput</code> to deserialize this object from.
-     * @throws IOException
+     * @throws UnsupportedOperationException  this function is not supported
      */
 	@Override
     public void readFields(DataInput in)  {
 		throw new UnsupportedOperationException("ChunkWritable.readFields() is not implemented");
 	}
-	
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
index f18ecef..0f8f908 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java
@@ -20,7 +20,6 @@ import org.apache.hadoop.mapred.JobConf;
 import java.io.IOException;
 import java.util.ArrayList;
 
-
 /**
  * Analyzer class for HDFS data resources
  *
@@ -33,10 +32,10 @@ public class HdfsAnalyzer extends Analyzer {
     private Log Log;
 
     /**
-     * Constructs an HdfsAnalyzer object
-     * 
+     * Constructs an HdfsAnalyzer object.
+     *
      * @param inputData all input parameters coming from the client
-     * @throws IOException
+     * @throws IOException if HDFS file system cannot be retrieved
      */
     public HdfsAnalyzer(InputData inputData) throws IOException {
         super(inputData);
@@ -49,11 +48,13 @@ public class HdfsAnalyzer extends Analyzer {
     /**
      * Collects a number of basic statistics based on an estimate. Statistics
      * are: number of records, number of hdfs blocks and hdfs block size.
-     * 
-     * @param datapath path is a data source URI that can appear as a file 
+     *
+     * @param datapath path is a data source URI that can appear as a file
      *        name, a directory name or a wildcard pattern
-     * @return statistics in json format
-     * @throws Exception
+     * @return statistics in JSON format
+     * @throws Exception if path is wrong, its metadata cannot be retrieved
+     *                    from file system, or if scanning the first block
+     *                    using the accessor failed
      */
     @Override
     public AnalyzerStats getEstimatedStats(String datapath) throws Exception {
@@ -89,8 +90,8 @@ public class HdfsAnalyzer extends Analyzer {
         return stats;
     }
 
-    /*
-     * Calculate the number of tuples in a split (block)
+    /**
+     * Calculates the number of tuples in a split (block).
      * Reads one block from HDFS. Exception during reading will
      * filter upwards and handled in AnalyzerResource
      */
@@ -129,7 +130,7 @@ public class HdfsAnalyzer extends Analyzer {
         PxfInputFormat.setInputPaths(jobConf, path);
         InputSplit[] splits = fformat.getSplits(jobConf, 1);
         ArrayList<InputSplit> result = new ArrayList<InputSplit>();
-        
+
         // remove empty splits
         if (splits != null) {
 	        for (InputSplit split : splits) {
@@ -138,7 +139,7 @@ public class HdfsAnalyzer extends Analyzer {
 	        	}
 	        }
         }
-        
-        return result;        
+
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
index da02590..4a43c5f 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java
@@ -15,19 +15,18 @@ import java.io.InputStream;
 import java.net.URI;
 
 /**
- * Base class for enforcing the complete access of a file in one accessor. 
+ * Base class for enforcing the complete access of a file in one accessor.
  * Since we are not accessing the file using the splittable API, but instead are
  * using the "simple" stream API, it means that we cannot fetch different parts
  * (splits) of the file in different segments. Instead each file access brings
- * the complete file. And, if several segments would access the same file, then 
+ * the complete file. And, if several segments would access the same file, then
  * each one will return the whole file and we will observe in the query result,
- * each record appearing number_of_segments times. To avoid this we will only 
- * have one segment (segment 0) working for this case - enforced with 
- * isWorkingSegment() method. Naturally this is the less recommended working 
+ * each record appearing number_of_segments times. To avoid this we will only
+ * have one segment (segment 0) working for this case - enforced with
+ * isWorkingSegment() method. Naturally this is the less recommended working
  * mode since we are not making use of segment parallelism. HDFS accessors for
  * a specific file type should inherit from this class only if the file they are
  * reading does not support splitting: a protocol-buffer file, regular file, ...
- * 
  */
 public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAccessor {
     private Configuration conf = null;
@@ -36,10 +35,10 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
 
     /**
      * Constructs a HdfsAtomicDataAccessor object.
+     *
      * @param input all input parameters coming from the client
-     * @throws Exception
      */
-    public HdfsAtomicDataAccessor(InputData input) throws Exception {
+    public HdfsAtomicDataAccessor(InputData input) {
         // 0. Hold the configuration data
         super(input);
 
@@ -52,10 +51,11 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
     /**
      * Opens the file using the non-splittable API for HADOOP HDFS file access
      * This means that instead of using a FileInputFormat for access, we use a
-     * Java stream
-     * 
+     * Java stream.
+     *
      * @return true for successful file open, false otherwise
      */
+    @Override
     public boolean openForRead() throws Exception {
         if (!isWorkingSegment()) {
             return false;
@@ -70,8 +70,10 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
 
     /**
      * Fetches one record from the file.
-     * @return a {@link OneRow} record as a Java object. returns null if none.
+     *
+     * @return a {@link OneRow} record as a Java object. Returns null if none.
      */
+    @Override
     public OneRow readNextObject() throws IOException {
         if (!isWorkingSegment()) {
             return null;
@@ -83,6 +85,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces
     /**
      * Closes the access stream when finished reading the file
      */
+    @Override
     public void closeForRead() throws Exception {
         if (!isWorkingSegment()) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
index ab4aa67..1bf5aab 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java
@@ -15,43 +15,43 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- * Fragmenter class for HDFS data resources
+ * Fragmenter class for HDFS data resources.
  *
- * Given an HDFS data source (a file, directory, or wild card pattern)
- * divide the data into fragments and return a list of them along with
- * a list of host:port locations for each.
+ * Given an HDFS data source (a file, directory, or wild card pattern) divide
+ * the data into fragments and return a list of them along with a list of
+ * host:port locations for each.
  */
 public class HdfsDataFragmenter extends Fragmenter {
     private JobConf jobConf;
 
     /**
-     * Constructs an HdfsDataFragmenter object
+     * Constructs an HdfsDataFragmenter object.
+     *
      * @param md all input parameters coming from the client
-     * @throws IOException
      */
-    public HdfsDataFragmenter(InputData md) throws IOException {
+    public HdfsDataFragmenter(InputData md) {
         super(md);
 
         jobConf = new JobConf(new Configuration(), HdfsDataFragmenter.class);
     }
 
-    /*
-     * path is a data source URI that can appear as a file
-     * name, a directory name  or a wildcard returns the data
-     * fragments in json format
+    /**
+     * Gets the fragments for a data source URI that can appear as a file name,
+     * a directory name or a wildcard. Returns the data fragments in JSON
+     * format.
      */
     @Override
     public List<Fragment> getFragments() throws Exception {
-		String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
+        String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource());
         InputSplit[] splits = getSplits(new Path(absoluteDataPath));
 
-        for (InputSplit split : splits != null ? splits : new InputSplit[]{}) {
+        for (InputSplit split : splits != null ? splits : new InputSplit[] {}) {
             FileSplit fsp = (FileSplit) split;
 
-			/*
-             * HD-2547: If the file is empty, an empty split is returned:
-			 * no locations and no length.
-			 */
+            /*
+             * HD-2547: If the file is empty, an empty split is returned: no
+             * locations and no length.
+             */
             if (fsp.getLength() <= 0) {
                 continue;
             }
@@ -59,10 +59,10 @@ public class HdfsDataFragmenter extends Fragmenter {
             String filepath = fsp.getPath().toUri().getPath();
             String[] hosts = fsp.getLocations();
 
-			/*
-             * metadata information includes: file split's
-			 * start, length and hosts (locations).
-			 */
+            /*
+             * metadata information includes: file split's start, length and
+             * hosts (locations).
+             */
             byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp);
             Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata);
             fragments.add(fragment);
@@ -76,5 +76,4 @@ public class HdfsDataFragmenter extends Fragmenter {
         PxfInputFormat.setInputPaths(jobConf, path);
         return format.getSplits(jobConf, 1);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
index 8f49d48..744342d 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java
@@ -16,10 +16,11 @@ import java.util.ListIterator;
  * Accessor for accessing a splittable HDFS data sources. HDFS will divide the
  * file into splits based on an internal decision (by default, the block size is
  * also the split size).
- * 
+ *
  * Accessors that require such base functionality should extend this class.
  */
-public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadAccessor {
+public abstract class HdfsSplittableDataAccessor extends Plugin implements
+        ReadAccessor {
     protected Configuration conf = null;
     protected RecordReader<Object, Object> reader = null;
     protected InputFormat<?, ?> inputFormat = null;
@@ -29,12 +30,12 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
 
     /**
      * Constructs an HdfsSplittableDataAccessor
-     * 
+     *
      * @param input all input parameters coming from the client request
      * @param inFormat the HDFS {@link InputFormat} the caller wants to use
-     * @throws Exception
      */
-    public HdfsSplittableDataAccessor(InputData input, InputFormat<?, ?> inFormat) throws Exception {
+    public HdfsSplittableDataAccessor(InputData input,
+                                      InputFormat<?, ?> inFormat) {
         super(input);
         inputFormat = inFormat;
 
@@ -46,11 +47,12 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
     }
 
     /**
-     * Fetches the requested fragment (file split) for the current client 
+     * Fetches the requested fragment (file split) for the current client
      * request, and sets a record reader for the job.
-     * 
+     *
      * @return true if succeeded, false if no more splits to be read
      */
+    @Override
     public boolean openForRead() throws Exception {
         LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>();
         FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData);
@@ -62,23 +64,28 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
     }
 
     /**
-     * Specialized accessors will override this method and implement their own 
+     * Specialized accessors will override this method and implement their own
      * recordReader. For example, a plain delimited text accessor may want to
-     * return a LineRecordReader. 
-     * 
+     * return a LineRecordReader.
+     *
      * @param jobConf the hadoop jobconf to use for the selected InputFormat
      * @param split the input split to be read by the accessor
-     * @return a recordreader to be used for reading the data records of the split
-     * @throws IOException
+     * @return a recordreader to be used for reading the data records of the
+     *         split
+     * @throws IOException if recordreader could not be created
      */
-    abstract protected Object getReader(JobConf jobConf, InputSplit split) throws IOException;
+    abstract protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException;
 
-    /*
-     * getNextSplit
-     * Sets the current split and initializes a RecordReader who feeds from the split
+    /**
+     * Sets the current split and initializes a RecordReader who feeds from the
+     * split
+     *
+     * @return true if there is a split to read
+     * @throws IOException if record reader could not be created
      */
     @SuppressWarnings(value = "unchecked")
-    protected boolean getNextSplit() throws IOException {
+    protected boolean getNextSplit() throws IOException  {
         if (!iter.hasNext()) {
             return false;
         }
@@ -90,36 +97,40 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
         return true;
     }
 
-    /*
-     * readNextObject
-     * Fetches one record from the  file. The record is returned as a Java object.
+    /**
+     * Fetches one record from the file. The record is returned as a Java
+     * object.
      */
     @Override
     public OneRow readNextObject() throws IOException {
-		
-        if (!reader.next(key, data)) { // if there is one more record in the current split
-            if (getNextSplit()) {// the current split is exhausted. try to move to the next split.
-                if (!reader.next(key, data)) {// read the first record of the new split
-                    return null; // make sure we return nulls
+        // if there is one more record in the current split
+        if (!reader.next(key, data)) {
+            // the current split is exhausted. try to move to the next split
+            if (getNextSplit()) {
+                // read the first record of the new split
+                if (!reader.next(key, data)) {
+                    // make sure we return nulls
+                    return null;
                 }
             } else {
-                return null; // make sure we return nulls
+                // make sure we return nulls
+                return null;
             }
         }
 
-		/*
-		 * if neither condition was met, it means we already read all the
-		 * records in all the splits, and in this call record variable was not
-		 * set, so we return null and thus we are signaling end of records
-		 * sequence
-		 */
+        /*
+         * if neither condition was met, it means we already read all the
+         * records in all the splits, and in this call record variable was not
+         * set, so we return null and thus we are signaling end of records
+         * sequence
+         */
         return new OneRow(key, data);
     }
 
-    /*
-     * closeForRead
+    /**
      * When user finished reading the file, it closes the RecordReader
      */
+    @Override
     public void closeForRead() throws Exception {
         if (reader != null) {
             reader.close();
@@ -129,7 +140,7 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements ReadA
     @Override
     public boolean isThreadSafe() {
         return HdfsUtilities.isThreadSafe(inputData.getDataSource(),
-				  inputData.getUserProperty("COMPRESSION_CODEC"));
+                inputData.getUserProperty("COMPRESSION_CODEC"));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
index 28b125c..32002a1 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java
@@ -16,39 +16,38 @@ import org.apache.hadoop.mapred.*;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
-
 /**
- * A PXF Accessor for reading delimited plain text records
+ * A PXF Accessor for reading delimited plain text records.
  */
-public class LineBreakAccessor extends HdfsSplittableDataAccessor implements WriteAccessor {
+public class LineBreakAccessor extends HdfsSplittableDataAccessor implements
+        WriteAccessor {
     private DataOutputStream dos;
     private FSDataOutputStream fsdos;
     private Configuration conf;
     private FileSystem fs;
     private Path file;
-    private Log Log;
+    private static Log Log = LogFactory.getLog(LineBreakAccessor.class);
 
     /**
-     * Constructs a LineReaderAccessor
-     * 
+     * Constructs a LineReaderAccessor.
+     *
      * @param input all input parameters coming from the client request
-     * @throws Exception
      */
-    public LineBreakAccessor(InputData input) throws Exception {
+    public LineBreakAccessor(InputData input) {
         super(input, new TextInputFormat());
         ((TextInputFormat) inputFormat).configure(jobConf);
-
-        Log = LogFactory.getLog(LineBreakAccessor.class);
     }
 
     @Override
-    protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+    protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException {
         return new ChunkRecordReader(jobConf, (FileSplit) split);
     }
-	
-    /* 
-     * opens file for write
+
+    /**
+     * Opens file for write.
      */
+    @Override
     public boolean openForWrite() throws Exception {
 
         String fileName = inputData.getDataSource();
@@ -68,7 +67,8 @@ public class LineBreakAccessor extends HdfsSplittableDataAccessor implements Wri
         file = new Path(fileName);
 
         if (fs.exists(file)) {
-            throw new IOException("file " + file.toString() + " already exists, can't write data");
+            throw new IOException("file " + file.toString()
+                    + " already exists, can't write data");
         }
         org.apache.hadoop.fs.Path parent = file.getParent();
         if (!fs.exists(parent)) {
@@ -83,10 +83,11 @@ public class LineBreakAccessor extends HdfsSplittableDataAccessor implements Wri
     }
 
     /*
-     * Create output stream from given file.
-     * If compression codec is provided, wrap it around stream.
+     * Creates output stream from given file. If compression codec is provided,
+     * wrap it around stream.
      */
-    private void createOutputStream(Path file, CompressionCodec codec) throws IOException {
+    private void createOutputStream(Path file, CompressionCodec codec)
+            throws IOException {
         fsdos = fs.create(file, false);
         if (codec != null) {
             dos = new DataOutputStream(codec.createOutputStream(fsdos));
@@ -96,28 +97,30 @@ public class LineBreakAccessor extends HdfsSplittableDataAccessor implements Wri
 
     }
 
-    /*
-     * write row into stream
+    /**
+     * Writes row into stream.
      */
+    @Override
     public boolean writeNextObject(OneRow onerow) throws Exception {
         dos.write((byte[]) onerow.getData());
         return true;
     }
-    
-    /* 
-     * close the output stream after done writing
+
+    /**
+     * Closes the output stream after done writing.
      */
+    @Override
     public void closeForWrite() throws Exception {
         if ((dos != null) && (fsdos != null)) {
             Log.debug("Closing writing stream for path " + file);
             dos.flush();
             /*
              * From release 0.21.0 sync() is deprecated in favor of hflush(),
-			 * which only guarantees that new readers will see all data written 
-			 * to that point, and hsync(), which makes a stronger guarantee that
-			 * the operating system has flushed the data to disk (like POSIX 
-			 * fsync), although data may still be in the disk cache.
-			 */
+             * which only guarantees that new readers will see all data written
+             * to that point, and hsync(), which makes a stronger guarantee that
+             * the operating system has flushed the data to disk (like POSIX
+             * fsync), although data may still be in the disk cache.
+             */
             fsdos.hsync();
             dos.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
index 3fc0f5e..13880b8 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java
@@ -16,12 +16,11 @@ public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
     private BufferedReader reader;
 
     /**
-     * Constructs a QuotedLineBreakAccessor
-     * 
+     * Constructs a QuotedLineBreakAccessor.
+     *
      * @param input all input parameters coming from the client request
-     * @throws Exception
      */
-    public QuotedLineBreakAccessor(InputData input) throws Exception {
+    public QuotedLineBreakAccessor(InputData input) {
         super(input);
     }
 
@@ -34,8 +33,7 @@ public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor {
         return true;
     }
 
-    /*
-     * readNextObject
+    /**
      * Fetches one record (maybe partial) from the  file. The record is returned as a Java object.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
index d3e7292..5f3f3dd 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java
@@ -4,6 +4,7 @@ import com.pivotal.pxf.api.OneRow;
 import com.pivotal.pxf.api.WriteAccessor;
 import com.pivotal.pxf.api.utilities.InputData;
 import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -24,7 +25,8 @@ import java.util.EnumSet;
 /**
  * A PXF Accessor for reading and writing Sequence File records
  */
-public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements WriteAccessor {
+public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
+        WriteAccessor {
 
     private Configuration conf;
     private FileContext fc;
@@ -34,27 +36,24 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
     private SequenceFile.Writer writer;
     private LongWritable defaultKey; // used when recordkey is not defined
 
-    private Log Log;
+    private static Log Log = LogFactory.getLog(SequenceFileAccessor.class);;
 
     /**
-     * Constructs a SequenceFileAccessor
-     * 
+     * Constructs a SequenceFileAccessor.
+     *
      * @param input all input parameters coming from the client request
-     * @throws Exception
      */
-    public SequenceFileAccessor(InputData input) throws Exception {
-
-        super(input,
-                new SequenceFileInputFormat<Writable, Writable>());
-
-        Log = LogFactory.getLog(SequenceFileAccessor.class);
+    public SequenceFileAccessor(InputData input) {
+        super(input, new SequenceFileInputFormat<Writable, Writable>());
     }
 
-    /*
-     * Override virtual method to create specialized record reader
+    /**
+     * Overrides virtual method to create specialized record reader
      */
-    protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
-        return new SequenceFileRecordReader(jobConf, (FileSplit) split);
+    @Override
+    protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException {
+        return new SequenceFileRecordReader<Object, Object>(jobConf, (FileSplit) split);
     }
 
     @Override
@@ -74,7 +73,8 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
         defaultKey = new LongWritable(inputData.getSegmentId());
 
         if (fs.exists(file)) {
-            throw new IOException("file " + file + " already exists, can't write data");
+            throw new IOException("file " + file
+                    + " already exists, can't write data");
         }
         parent = file.getParent();
         if (!fs.exists(parent)) {
@@ -87,10 +87,11 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
     }
 
     /**
-     * Compression: based on compression codec and compression type (default value RECORD).
-     * If there is no codec, compression type is ignored, and NONE is used.
+     * Compression: based on compression codec and compression type (default
+     * value RECORD). If there is no codec, compression type is ignored, and
+     * NONE is used.
      *
-     * @param inputData - container where compression codec and type are held.
+     * @param inputData - container where compression codec and type are held
      */
     private void getCompressionCodec(InputData inputData) {
 
@@ -106,21 +107,23 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
             try {
                 compressionType = CompressionType.valueOf(parsedCompressType);
             } catch (IllegalArgumentException e) {
-                throw new IllegalArgumentException("Illegal value for compression type " +
-                        "'" + parsedCompressType + "'");
+                throw new IllegalArgumentException(
+                        "Illegal value for compression type " + "'"
+                                + parsedCompressType + "'");
             }
             if (compressionType == null) {
-                throw new IllegalArgumentException("Compression type must be defined");
+                throw new IllegalArgumentException(
+                        "Compression type must be defined");
             }
 
-            Log.debug("Compression ON: " +
-                    "compression codec: " + userCompressCodec +
-                    ", compression type: " + compressionType);
+            Log.debug("Compression ON: " + "compression codec: "
+                    + userCompressCodec + ", compression type: "
+                    + compressionType);
         }
     }
 
     /*
-     * Parse compression type for sequence file. If null, default to RECORD.
+     * Parses compression type for sequence file. If null, default to RECORD.
      * Allowed values: RECORD, BLOCK.
      */
     private String parseCompressionType(String compressType) {
@@ -133,13 +136,15 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
         }
 
         if (compressType.equalsIgnoreCase(COMPRESSION_TYPE_NONE)) {
-            throw new IllegalArgumentException("Illegal compression type 'NONE'. " +
-                    "For disabling compression remove COMPRESSION_CODEC parameter.");
+            throw new IllegalArgumentException(
+                    "Illegal compression type 'NONE'. "
+                            + "For disabling compression remove COMPRESSION_CODEC parameter.");
         }
 
-        if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD) &&
-        	!compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
-            throw new IllegalArgumentException("Illegal compression type '" + compressType + "'");
+        if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD)
+                && !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) {
+            throw new IllegalArgumentException("Illegal compression type '"
+                    + compressType + "'");
         }
 
         return compressType.toUpperCase();
@@ -165,11 +170,13 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
         // init writer on first approach here, based on onerow.getData type
         // TODO: verify data is serializable.
         if (writer == null) {
-            Class valueClass = value.getClass();
-            Class keyClass = (key == null) ? LongWritable.class : key.getClass();
+            Class<? extends Writable> valueClass = value.getClass();
+            Class<? extends Writable> keyClass = (key == null) ? LongWritable.class
+                    : key.getClass();
             // create writer - do not allow overwriting existing file
-            writer = SequenceFile.createWriter(fc, conf, file, keyClass, valueClass,
-                    compressionType, codec, new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
+            writer = SequenceFile.createWriter(fc, conf, file, keyClass,
+                    valueClass, compressionType, codec,
+                    new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE));
         }
 
         try {
@@ -188,20 +195,21 @@ public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements
             writer.sync();
             /*
              * From release 0.21.0 sync() is deprecated in favor of hflush(),
-			 * which only guarantees that new readers will see all data written to that point, 
-			 * and hsync(), which makes a stronger guarantee that the operating system has flushed 
-			 * the data to disk (like POSIX fsync), although data may still be in the disk cache.
-			 */
+             * which only guarantees that new readers will see all data written
+             * to that point, and hsync(), which makes a stronger guarantee that
+             * the operating system has flushed the data to disk (like POSIX
+             * fsync), although data may still be in the disk cache.
+             */
             writer.hsync();
             writer.close();
         }
     }
 
-	public CompressionType getCompressionType() {
-		return compressionType;
-	}
+    public CompressionType getCompressionType() {
+        return compressionType;
+    }
 
-	public CompressionCodec getCodec() {
-		return codec;
-	}
+    public CompressionCodec getCodec() {
+        return codec;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
index 588792b..7ace4c8 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java
@@ -15,8 +15,8 @@ import java.util.List;
 import static com.pivotal.pxf.api.io.DataType.VARCHAR;
 
 /**
- * StringPassResolver handles "deserialization" and serialization of 
- * String records. StringPassResolver implements IReadResolver and 
+ * StringPassResolver handles "deserialization" and serialization of
+ * String records. StringPassResolver implements IReadResolver and
  * IWriteResolver interfaces. Returns strings as-is.
  */
 public class StringPassResolver extends Plugin implements ReadResolver, WriteResolver {
@@ -24,20 +24,19 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
     private OneRow oneRow;
 
     /**
-     * Constructs a StringPassResolver
-     * 
+     * Constructs a StringPassResolver.
+     *
      * @param inputData input all input parameters coming from the client request
-     * @throws Exception
      */
-    public StringPassResolver(InputData inputData) throws Exception {
+    public StringPassResolver(InputData inputData) {
         super(inputData);
         oneRow = new OneRow();
         this.inputData = inputData;
     }
 
-    /*
-     * getFields returns a list of the fields of one record.
-     * Each record field is represented by a OneField item.
+    /**
+     * Returns a list of the fields of one record.
+     * Each record field is represented by a {@link OneField} item.
      * OneField item contains two fields: an integer representing the field type and a Java
      * Object representing the field value.
      */
@@ -45,7 +44,7 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
     public List<OneField> getFields(OneRow onerow) {
         /*
          * This call forces a whole text line into a single varchar field and replaces
-		 * the proper field separation code can be found in previous revisions. The reasons 
+		 * the proper field separation code can be found in previous revisions. The reasons
 		 * for doing so as this point are:
 		 * 1. performance
 		 * 2. desire to not replicate text parsing logic from the backend into java
@@ -61,8 +60,8 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
         return record;
     }
 
-    /*
-     * Creates a OneRow object from the singleton list
+    /**
+     * Creates a OneRow object from the singleton list.
      */
     @Override
     public OneRow setFields(List<OneField> record) throws Exception {
@@ -74,5 +73,3 @@ public class StringPassResolver extends Plugin implements ReadResolver, WriteRes
         return oneRow;
     }
 }
-
-

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
index 2913433..e9df907 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java
@@ -23,11 +23,11 @@ import java.util.List;
 import static com.pivotal.pxf.api.io.DataType.*;
 
 /**
- * WritableResolver handles serialization and deserialization of records 
+ * WritableResolver handles serialization and deserialization of records
  * that were serialized using Hadoop's Writable serialization framework.
- * 
- * A field named 'recordkey' is treated as a key of the given row, and not as 
- * part of the data schema. @See RecordkeyAdapter.
+ *
+ * A field named 'recordkey' is treated as a key of the given row, and not as
+ * part of the data schema. See {@link RecordkeyAdapter}.
  */
 public class WritableResolver extends Plugin implements ReadResolver, WriteResolver {
     private static final int RECORDKEY_UNDEFINED = -1;
@@ -40,10 +40,11 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
 
 
     /**
-     * Constructs a WritableResolver
-     * 
+     * Constructs a WritableResolver.
+     *
      * @param input all input parameters coming from the client
-     * @throws Exception
+     * @throws Exception if schema file is missing, cannot be found in
+     *                   classpath or fails to instantiate
      */
     public WritableResolver(InputData input) throws Exception {
         super(input);
@@ -77,7 +78,6 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
                         " type: " + javaType + ", " +
                         (isArray(javaType) ? "Array" : "Primitive") + ", " +
                         (isPrivate ? "Private" : "accessible") + " field");
-
             }
         }
     }
@@ -86,12 +86,6 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
         return (javaType.startsWith("[") && !"[B".equals(javaType));
     }
 
-    /*
-     * getFields returns a list of the fields of one record.
-     * Each record field is represented by a OneField item.
-     * OneField item contains two fields: an integer representing the field type and a Java
-     * Object representing the field value.
-     */
     @Override
     public List<OneField> getFields(OneRow onerow) throws Exception {
         userObject = onerow.getData();
@@ -168,8 +162,8 @@ public class WritableResolver extends Plugin implements ReadResolver, WriteResol
         }
     }
 
-    /*
-     * Sets customWritable fields and creates a OneRow object
+    /**
+     * Sets customWritable fields and creates a OneRow object.
      */
     @Override
     public OneRow setFields(List<OneField> record) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
index 03fe94c..c7fe103 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -26,20 +26,20 @@ import java.io.*;
 import java.util.List;
 
 /**
- * HdfsUtilities class exposes helper methods for PXF classes
+ * HdfsUtilities class exposes helper methods for PXF classes.
  */
 public class HdfsUtilities {
     private static Log Log = LogFactory.getLog(HdfsUtilities.class);
     private static Configuration config = new Configuration();
-    private static CompressionCodecFactory factory =
-            new CompressionCodecFactory(config);
+    private static CompressionCodecFactory factory = new CompressionCodecFactory(
+            config);
 
     /**
-     * Hdfs data sources are absolute data paths. Method ensures
-     * that dataSource begins with '/'
+     * Hdfs data sources are absolute data paths. Method ensures that dataSource
+     * begins with '/'.
      *
      * @param dataSource The HDFS path to a file or directory of interest.
-     *                   Retrieved from the client request.
+     *            Retrieved from the client request.
      * @return an absolute data path
      */
     public static String absoluteDataPath(String dataSource) {
@@ -49,20 +49,22 @@ public class HdfsUtilities {
     /*
      * Helper routine to get a compression codec class
      */
-    private static Class<? extends CompressionCodec> getCodecClass(
-            Configuration conf, String name) {
+    private static Class<? extends CompressionCodec> getCodecClass(Configuration conf,
+                                                                   String name) {
 
         Class<? extends CompressionCodec> codecClass;
         try {
-            codecClass = conf.getClassByName(name).asSubclass(CompressionCodec.class);
+            codecClass = conf.getClassByName(name).asSubclass(
+                    CompressionCodec.class);
         } catch (ClassNotFoundException e) {
-            throw new IllegalArgumentException("Compression codec " + name + " was not found.", e);
+            throw new IllegalArgumentException("Compression codec " + name
+                    + " was not found.", e);
         }
         return codecClass;
     }
 
     /**
-     * Helper routine to get compression codec through reflection
+     * Helper routine to get compression codec through reflection.
      *
      * @param conf configuration used for reflection
      * @param name codec name
@@ -73,7 +75,7 @@ public class HdfsUtilities {
     }
 
     /**
-     * Helper routine to get compression codec class by path (file suffix)
+     * Helper routine to get compression codec class by path (file suffix).
      *
      * @param path path of file to get codec for
      * @return matching codec class for the path. null if no codec is needed.
@@ -91,8 +93,8 @@ public class HdfsUtilities {
     }
 
     /**
-     * Returns true if the needed codec is splittable.
-     * If no codec is needed returns true as well.
+     * Returns true if the needed codec is splittable. If no codec is needed
+     * returns true as well.
      *
      * @param path path of the file to be read
      * @return if the codec needed for reading the specified path is splittable.
@@ -110,30 +112,32 @@ public class HdfsUtilities {
     /**
      * Checks if requests should be handle in a single thread or not.
      *
-     * @param dataDir   hdfs path to the data source
+     * @param dataDir hdfs path to the data source
      * @param compCodec the fully qualified name of the compression codec
      * @return if the request can be run in multi-threaded mode.
      */
     public static boolean isThreadSafe(String dataDir, String compCodec) {
 
-        Class<? extends CompressionCodec> codecClass = (compCodec != null)
-                ? HdfsUtilities.getCodecClass(config, compCodec)
-                : HdfsUtilities.getCodecClassByPath(dataDir);
+        Class<? extends CompressionCodec> codecClass = (compCodec != null) ? HdfsUtilities.getCodecClass(
+                config, compCodec) : HdfsUtilities.getCodecClassByPath(dataDir);
         /* bzip2 codec is not thread safe */
         return (codecClass == null || !BZip2Codec.class.isAssignableFrom(codecClass));
     }
 
     /**
-     * Prepare byte serialization of a file split information
-     * (start, length, hosts) using {@link ObjectOutputStream}.
+     * Prepares byte serialization of a file split information (start, length,
+     * hosts) using {@link ObjectOutputStream}.
      *
      * @param fsp file split to be serialized
      * @return byte serialization of fsp
-     * @throws IOException
+     * @throws IOException if I/O errors occur while writing to the underlying
+     *             stream
      */
-    public static byte[] prepareFragmentMetadata(FileSplit fsp) throws IOException {
+    public static byte[] prepareFragmentMetadata(FileSplit fsp)
+            throws IOException {
         ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
-        ObjectOutputStream objectStream = new ObjectOutputStream(byteArrayStream);
+        ObjectOutputStream objectStream = new ObjectOutputStream(
+                byteArrayStream);
         objectStream.writeLong(fsp.getStart());
         objectStream.writeLong(fsp.getLength());
         objectStream.writeObject(fsp.getLocations());
@@ -142,7 +146,7 @@ public class HdfsUtilities {
     }
 
     /**
-     * Parse fragment metadata and return matching {@link FileSplit}
+     * Parses fragment metadata and return matching {@link FileSplit}.
      *
      * @param inputData request input data
      * @return FileSplit with fragment metadata
@@ -151,10 +155,12 @@ public class HdfsUtilities {
         try {
             byte[] serializedLocation = inputData.getFragmentMetadata();
             if (serializedLocation == null) {
-                throw new IllegalArgumentException("Missing fragment location information");
+                throw new IllegalArgumentException(
+                        "Missing fragment location information");
             }
 
-            ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedLocation);
+            ByteArrayInputStream bytesStream = new ByteArrayInputStream(
+                    serializedLocation);
             ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
 
             long start = objectStream.readLong();
@@ -162,44 +168,56 @@ public class HdfsUtilities {
 
             String[] hosts = (String[]) objectStream.readObject();
 
-            FileSplit fileSplit = new FileSplit(new Path(inputData.getDataSource()),
-                    start,
-                    end,
-                    hosts);
+            FileSplit fileSplit = new FileSplit(new Path(
+                    inputData.getDataSource()), start, end, hosts);
 
-            Log.debug("parsed file split: path " + inputData.getDataSource() +
-                    ", start " + start + ", end " + end +
-                    ", hosts " + ArrayUtils.toString(hosts));
+            Log.debug("parsed file split: path " + inputData.getDataSource()
+                    + ", start " + start + ", end " + end + ", hosts "
+                    + ArrayUtils.toString(hosts));
 
             return fileSplit;
 
         } catch (Exception e) {
-            throw new RuntimeException("Exception while reading expected fragment metadata", e);
+            throw new RuntimeException(
+                    "Exception while reading expected fragment metadata", e);
         }
     }
 
     /**
-     * Accessing the avro file through the "unsplittable" API just to get the schema.
-     * The splittable API (AvroInputFormat) which is the one we will be using to fetch
-     * the records, does not support getting the avro schema yet.
+     * Accessing the Avro file through the "unsplittable" API just to get the
+     * schema. The splittable API (AvroInputFormat) which is the one we will be
+     * using to fetch the records, does not support getting the Avro schema yet.
      *
-     * @param conf       Hadoop configuration
+     * @param conf Hadoop configuration
      * @param dataSource Avro file (i.e fileName.avro) path
      * @return the Avro schema
-     * @throws IOException
+     * @throws IOException if I/O error occured while accessing Avro schema file
      */
-    public static Schema getAvroSchema(Configuration conf, String dataSource) throws IOException {
+    public static Schema getAvroSchema(Configuration conf, String dataSource)
+            throws IOException {
         FsInput inStream = new FsInput(new Path(dataSource), conf);
         DatumReader<GenericRecord> dummyReader = new GenericDatumReader<>();
-        DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(inStream, dummyReader);
-        return dummyFileReader.getSchema();
+        DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(
+                inStream, dummyReader);
+        Schema schema = dummyFileReader.getSchema();
+        dummyFileReader.close();
+        return schema;
     }
 
+    /**
+     * Returns string serialization of list of fields. Fields of binary type
+     * (BYTEA) are converted to octal representation to make sure they will be
+     * relayed properly to the DB.
+     *
+     * @param complexRecord list of fields to be stringified
+     * @param delimiter delimiter between fields
+     * @return string of serialized fields using delimiter
+     */
     public static String toString(List<OneField> complexRecord, String delimiter) {
         StringBuilder buff = new StringBuilder();
         String delim = ""; // first iteration has no delimiter
         for (OneField complex : complexRecord) {
-            if(complex.type == DataType.BYTEA.getOID()) {
+            if (complex.type == DataType.BYTEA.getOID()) {
                 /** Serialize byte array as string */
                 buff.append(delim);
                 Utilities.byteArrayToOctalString((byte[]) complex.val, buff);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
index 89ac86e..a88ade0 100644
--- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
+++ b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
@@ -8,11 +8,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.*;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
- * Adapter used for adding a recordkey field to the records output List<OneField>
+ * Adapter used for adding a recordkey field to the records output {@code List<OneField>}.
  */
 public class RecordkeyAdapter {
     private Log Log;
@@ -36,7 +35,7 @@ public class RecordkeyAdapter {
     private ValConverter converter = null;
 
     /**
-     * Constructs a RecordkeyAdapter
+     * Constructs a RecordkeyAdapter.
      */
     public RecordkeyAdapter() {
         Log = LogFactory.getLog(RecordkeyAdapter.class);
@@ -45,9 +44,9 @@ public class RecordkeyAdapter {
     /**
      *  Adds the recordkey to the end of the passed in recFields list.
      *  <p>
-     *  This method also verifies cases in which record keys are not supported 
-     *  by the underlying source type, and therefore "illegally" requested. 
-     *  
+     *  This method also verifies cases in which record keys are not supported
+     *  by the underlying source type, and therefore "illegally" requested.
+     *
      * @param recFields existing list of record (non-key) fields and their values.
      * @param input all input parameters coming from the client request
      * @param onerow a row object which is used here in order to find out if
@@ -55,12 +54,11 @@ public class RecordkeyAdapter {
      * @return 0 if record key not needed, or 1 if record key was appended
      * @throws NoSuchFieldException when the given record type does not support
      *         recordkeys
-     * @throws IOException
      */
     public int appendRecordkeyField(List<OneField> recFields,
                                     InputData input,
-                                    OneRow onerow) throws NoSuchFieldException, IOException {
-        
+                                    OneRow onerow) throws NoSuchFieldException {
+
 		/*
 		 * user did not request the recordkey field in the
 		 * "create external table" statement
@@ -107,7 +105,7 @@ public class RecordkeyAdapter {
 	 * key is already a Java primitive we returned it as is If it is an unknown
 	 * type we throw an exception
 	 */
-    private Object extractVal(Object key) throws IOException {
+    private Object extractVal(Object key) {
         if (extractor == null) {
             extractor = InitializeExtractor(key);
         }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
index 865a547..462c7a2 100644
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
+++ b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveAccessor.java
@@ -15,17 +15,15 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
-
 /**
- * Accessor for Hive tables.
- * The accessor will open and read a split belonging to a Hive table.
- * Opening a split means creating the corresponding InputFormat and RecordReader required to access the
- * split's data. The actual record reading is done in the base class -
- * {@link com.pivotal.pxf.plugins.hdfs.HdfsSplittableDataAccessor}.
- * <p/>
- * HiveAccessor will also enforce Hive partition filtering by filtering-out a split which does not
- * belong to a partition filter. Naturally, the partition filtering will be done only for Hive tables
- * that are partitioned.
+ * Accessor for Hive tables. The accessor will open and read a split belonging
+ * to a Hive table. Opening a split means creating the corresponding InputFormat
+ * and RecordReader required to access the split's data. The actual record
+ * reading is done in the base class -
+ * {@link com.pivotal.pxf.plugins.hdfs.HdfsSplittableDataAccessor}. <br>
+ * HiveAccessor will also enforce Hive partition filtering by filtering-out a
+ * split which does not belong to a partition filter. Naturally, the partition
+ * filtering will be done only for Hive tables that are partitioned.
  */
 public class HiveAccessor extends HdfsSplittableDataAccessor {
     private static final Log LOG = LogFactory.getLog(HiveAccessor.class);
@@ -46,15 +44,18 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
     protected Boolean filterInFragmenter = false;
 
     /**
-     * Constructs a HiveAccessor and creates an InputFormat (derived from {@link org.apache.hadoop.mapred.InputFormat})
-     * and the Hive partition fields
+     * Constructs a HiveAccessor and creates an InputFormat (derived from
+     * {@link org.apache.hadoop.mapred.InputFormat}) and the Hive partition
+     * fields
      *
      * @param input contains the InputFormat class name and the partition fields
+     * @throws Exception if failed to create input format
      */
     public HiveAccessor(InputData input) throws Exception {
         /*
-         * Unfortunately, Java does not allow us to call a function before calling the base constructor,
-         * otherwise it would have been: super(input, createInputFormat(input))
+         * Unfortunately, Java does not allow us to call a function before
+         * calling the base constructor, otherwise it would have been:
+         * super(input, createInputFormat(input))
          */
         super(input, null);
         inputFormat = createInputFormat(input);
@@ -63,20 +64,21 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
     /**
      * Constructs a HiveAccessor
      *
-     * @param input       contains the InputFormat class name and the partition fields
+     * @param input contains the InputFormat class name and the partition fields
      * @param inputFormat Hive InputFormat
      */
-    public HiveAccessor(InputData input, InputFormat<?, ?> inputFormat) throws Exception {
+    public HiveAccessor(InputData input, InputFormat<?, ?> inputFormat) {
         super(input, inputFormat);
     }
 
     /**
-     * openForRead
-     * Enables Hive partition filtering
+     * Opens Hive data split for read. Enables Hive partition filtering. <br>
      *
-     * @return true if there are no partitions or there is no partition filter or
-     * partition filter is set and the file currently opened by the accessor belongs
-     * to the partition.
+     * @return true if there are no partitions or there is no partition filter
+     *         or partition filter is set and the file currently opened by the
+     *         accessor belongs to the partition.
+     * @throws Exception if filter could not be built, connection to Hive failed
+     *             or resource failed to open
      */
     @Override
     public boolean openForRead() throws Exception {
@@ -86,27 +88,36 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
     /**
      * Creates the RecordReader suitable for this given split.
      *
-     * @param jobConf configuraton data for the Hadoop framework
-     * @param split   the split that was allocated for reading to this accessor
+     * @param jobConf configuration data for the Hadoop framework
+     * @param split the split that was allocated for reading to this accessor
+     * @return record reader
+     * @throws IOException if failed to create record reader
      */
     @Override
-    protected Object getReader(JobConf jobConf, InputSplit split) throws IOException {
+    protected Object getReader(JobConf jobConf, InputSplit split)
+            throws IOException {
         return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
     }
 
     /*
-     * Parse the user-data supplied by the HiveFragmenter from InputData. Based on the
-     * user-data construct the partition fields and the InputFormat for current split
+     * Parses the user-data supplied by the HiveFragmenter from InputData. Based
+     * on the user-data construct the partition fields and the InputFormat for
+     * current split
      */
-    private InputFormat<?, ?> createInputFormat(InputData input) throws Exception {
+    private InputFormat<?, ?> createInputFormat(InputData input)
+            throws Exception {
         String userData = new String(input.getFragmentUserData());
         String[] toks = userData.split(HiveDataFragmenter.HIVE_UD_DELIM);
         initPartitionFields(toks[3]);
         filterInFragmenter = new Boolean(toks[4]);
-        return HiveDataFragmenter.makeInputFormat(toks[0]/* inputFormat name */, jobConf);
+        return HiveDataFragmenter.makeInputFormat(
+                toks[0]/* inputFormat name */, jobConf);
     }
 
-    /* The partition fields are initialized one time base on userData provided by the fragmenter */
+    /*
+     * The partition fields are initialized one time base on userData provided
+     * by the fragmenter
+     */
     void initPartitionFields(String partitionKeys) {
         partitions = new LinkedList<HivePartition>();
         if (partitionKeys.equals(HiveDataFragmenter.HIVE_NO_PART_TBL)) {
@@ -140,9 +151,11 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
         boolean returnData = isFiltered(partitions, filter);
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("segmentId: " + inputData.getSegmentId() + " " + inputData.getDataSource() + "--" + filterStr + " returnData: " + returnData);
+            LOG.debug("segmentId: " + inputData.getSegmentId() + " "
+                    + inputData.getDataSource() + "--" + filterStr
+                    + " returnData: " + returnData);
             if (filter instanceof List) {
-                for (Object f : (List) filter) {
+                for (Object f : (List<?>) filter) {
                     printOneBasicFilter(f);
                 }
             } else {
@@ -153,14 +166,16 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
         return returnData;
     }
 
-    private boolean isFiltered(List<HivePartition> partitionFields, Object filter) {
+    private boolean isFiltered(List<HivePartition> partitionFields,
+                               Object filter) {
         if (filter instanceof List) {
             /*
-             * We are going over each filter in the filters list and test it against all the partition fields
-             * since filters are connected only by AND operators, its enough for one filter to fail in order to
+             * We are going over each filter in the filters list and test it
+             * against all the partition fields since filters are connected only
+             * by AND operators, its enough for one filter to fail in order to
              * deny this data.
              */
-            for (Object f : (List) filter) {
+            for (Object f : (List<?>) filter) {
                 if (!testOneFilter(partitionFields, f, inputData)) {
                     return false;
                 }
@@ -172,21 +187,27 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
     }
 
     /*
-     * We are testing one filter against all the partition fields.
-     * The filter has the form "fieldA = valueA".
-     * The partitions have the form partitionOne=valueOne/partitionTwo=ValueTwo/partitionThree=valueThree
-     * 1. For a filter to match one of the partitions, lets say partitionA for example, we need:
-     * fieldA = partittionOne and valueA = valueOne. If this condition occurs, we return true.
-     * 2. If fieldA does not match any one of the partition fields we also return true, it means we ignore this filter
-     * because it is not on a partition field.
-     * 3. If fieldA = partittionOne and valueA != valueOne, then we return false.
+     * We are testing one filter against all the partition fields. The filter
+     * has the form "fieldA = valueA". The partitions have the form
+     * partitionOne=valueOne/partitionTwo=ValueTwo/partitionThree=valueThree 1.
+     * For a filter to match one of the partitions, lets say partitionA for
+     * example, we need: fieldA = partittionOne and valueA = valueOne. If this
+     * condition occurs, we return true. 2. If fieldA does not match any one of
+     * the partition fields we also return true, it means we ignore this filter
+     * because it is not on a partition field. 3. If fieldA = partittionOne and
+     * valueA != valueOne, then we return false.
      */
-    private boolean testOneFilter(List<HivePartition> partitionFields, Object filter, InputData input) {
+    private boolean testOneFilter(List<HivePartition> partitionFields,
+                                  Object filter, InputData input) {
         // Let's look first at the filter
         FilterParser.BasicFilter bFilter = (FilterParser.BasicFilter) filter;
 
         boolean isFilterOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
-        if (!isFilterOperationEqual) /* in case this is not an "equality filter" we ignore it here - in partition filtering */ {
+        if (!isFilterOperationEqual) /*
+                                      * in case this is not an "equality filter"
+                                      * we ignore it here - in partition
+                                      * filtering
+                                      */{
             return true;
         }
 
@@ -197,12 +218,18 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
 
         for (HivePartition partition : partitionFields) {
             if (filterColumnName.equals(partition.name)) {
-                /* the filter field matches a partition field, but the values do not match */
+                /*
+                 * the filter field matches a partition field, but the values do
+                 * not match
+                 */
                 return filterValue.equals(partition.val);
             }
         }
 
-        /* filter field did not match any partition field, so we ignore this filter and hence return true */
+        /*
+         * filter field did not match any partition field, so we ignore this
+         * filter and hence return true
+         */
         return true;
     }
 
@@ -211,6 +238,7 @@ public class HiveAccessor extends HdfsSplittableDataAccessor {
         boolean isOperationEqual = (bFilter.getOperation() == FilterParser.Operation.HDOP_EQ);
         int columnIndex = bFilter.getColumn().index();
         String value = bFilter.getConstant().constant().toString();
-        LOG.debug("isOperationEqual: " + isOperationEqual + " columnIndex: " + columnIndex + " value: " + value);
+        LOG.debug("isOperationEqual: " + isOperationEqual + " columnIndex: "
+                + columnIndex + " value: " + value);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/dc115ff4/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
index 66fb0cb..6ebc62e 100644
--- a/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
+++ b/pxf/pxf-hive/src/main/java/com/pivotal/pxf/plugins/hive/HiveDataFragmenter.java
@@ -35,8 +35,8 @@ import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
 import com.pivotal.pxf.plugins.hive.utilities.HiveUtilities;
 
 /**
- * Fragmenter class for HIVE tables
- * <p/>
+ * Fragmenter class for HIVE tables.
+ * <br>
  * Given a Hive table and its partitions divide the data into fragments (here a
  * data fragment is actually a HDFS file block) and return a list of them. Each
  * data fragment will contain the following information:
@@ -70,7 +70,7 @@ public class HiveDataFragmenter extends Fragmenter {
     private Set<String> setPartitions = new TreeSet<String>(
             String.CASE_INSENSITIVE_ORDER);
 
-    /*
+    /**
      * A Hive table unit - means a subset of the HIVE table, where we can say
      * that for all files in this subset, they all have the same InputFormat and
      * Serde. For a partitioned table the HiveTableUnit will be one partition
@@ -101,7 +101,7 @@ public class HiveDataFragmenter extends Fragmenter {
     }
 
     /**
-     * Constructs a HiveDataFragmenter object
+     * Constructs a HiveDataFragmenter object.
      *
      * @param inputData all input parameters coming from the client
      */
@@ -110,7 +110,7 @@ public class HiveDataFragmenter extends Fragmenter {
     }
 
     /**
-     * Constructs a HiveDataFragmenter object
+     * Constructs a HiveDataFragmenter object.
      *
      * @param inputData all input parameters coming from the client
      * @param clazz Class for JobConf
@@ -131,11 +131,12 @@ public class HiveDataFragmenter extends Fragmenter {
     }
 
     /**
-     * Creates the partition InputFormat
+     * Creates the partition InputFormat.
      *
      * @param inputFormatName input format class name
      * @param jobConf configuration data for the Hadoop framework
      * @return a {@link org.apache.hadoop.mapred.InputFormat} derived object
+     * @throws Exception if failed to create input format
      */
     public static InputFormat<?, ?> makeInputFormat(String inputFormatName,
                                                     JobConf jobConf)



Mime
View raw message