hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shiv...@apache.org
Subject [09/15] incubator-hawq git commit: HAWQ-45. PXF package namespace refactor
Date Tue, 03 Nov 2015 00:36:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
new file mode 100644
index 0000000..aa854fc
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java
@@ -0,0 +1,231 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hawq.pxf.service.utilities.Utilities;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.*;
+import java.util.List;
+
+/**
+ * HdfsUtilities class exposes helper methods for PXF classes.
+ */
+public class HdfsUtilities {
+    private static Log Log = LogFactory.getLog(HdfsUtilities.class);
+    private static Configuration config = new Configuration();
+    private static CompressionCodecFactory factory = new CompressionCodecFactory(
+            config);
+
+    /**
+     * Hdfs data sources are absolute data paths. Method ensures that dataSource
+     * begins with '/'.
+     *
+     * @param dataSource The HDFS path to a file or directory of interest.
+     *            Retrieved from the client request.
+     * @return an absolute data path
+     */
+    public static String absoluteDataPath(String dataSource) {
+        return (dataSource.charAt(0) == '/') ? dataSource : "/" + dataSource;
+    }
+
+    /*
+     * Helper routine to get a compression codec class
+     */
+    private static Class<? extends CompressionCodec> getCodecClass(Configuration conf,
+                                                                   String name) {
+
+        Class<? extends CompressionCodec> codecClass;
+        try {
+            codecClass = conf.getClassByName(name).asSubclass(
+                    CompressionCodec.class);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalArgumentException("Compression codec " + name
+                    + " was not found.", e);
+        }
+        return codecClass;
+    }
+
+    /**
+     * Helper routine to get compression codec through reflection.
+     *
+     * @param conf configuration used for reflection
+     * @param name codec name
+     * @return generated CompressionCodec
+     */
+    public static CompressionCodec getCodec(Configuration conf, String name) {
+        return ReflectionUtils.newInstance(getCodecClass(conf, name), conf);
+    }
+
+    /**
+     * Helper routine to get compression codec class by path (file suffix).
+     *
+     * @param path path of file to get codec for
+     * @return matching codec class for the path. null if no codec is needed.
+     */
+    private static Class<? extends CompressionCodec> getCodecClassByPath(String path) {
+
+        Class<? extends CompressionCodec> codecClass = null;
+        CompressionCodec codec = factory.getCodec(new Path(path));
+        if (codec != null) {
+            codecClass = codec.getClass();
+        }
+        Log.debug((codecClass == null ? "No codec" : "Codec " + codecClass)
+                + " was found for file " + path);
+        return codecClass;
+    }
+
+    /**
+     * Returns true if the needed codec is splittable. If no codec is needed
+     * returns true as well.
+     *
+     * @param path path of the file to be read
+     * @return if the codec needed for reading the specified path is splittable.
+     */
+    public static boolean isSplittableCodec(Path path) {
+
+        final CompressionCodec codec = factory.getCodec(path);
+        if (null == codec) {
+            return true;
+        }
+
+        return codec instanceof SplittableCompressionCodec;
+    }
+
+    /**
+     * Checks if requests should be handle in a single thread or not.
+     *
+     * @param dataDir hdfs path to the data source
+     * @param compCodec the fully qualified name of the compression codec
+     * @return if the request can be run in multi-threaded mode.
+     */
+    public static boolean isThreadSafe(String dataDir, String compCodec) {
+
+        Class<? extends CompressionCodec> codecClass = (compCodec != null) ? HdfsUtilities.getCodecClass(
+                config, compCodec) : HdfsUtilities.getCodecClassByPath(dataDir);
+        /* bzip2 codec is not thread safe */
+        return (codecClass == null || !BZip2Codec.class.isAssignableFrom(codecClass));
+    }
+
+    /**
+     * Prepares byte serialization of a file split information (start, length,
+     * hosts) using {@link ObjectOutputStream}.
+     *
+     * @param fsp file split to be serialized
+     * @return byte serialization of fsp
+     * @throws IOException if I/O errors occur while writing to the underlying
+     *             stream
+     */
+    public static byte[] prepareFragmentMetadata(FileSplit fsp)
+            throws IOException {
+        ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream();
+        ObjectOutputStream objectStream = new ObjectOutputStream(
+                byteArrayStream);
+        objectStream.writeLong(fsp.getStart());
+        objectStream.writeLong(fsp.getLength());
+        objectStream.writeObject(fsp.getLocations());
+
+        return byteArrayStream.toByteArray();
+    }
+
+    /**
+     * Parses fragment metadata and return matching {@link FileSplit}.
+     *
+     * @param inputData request input data
+     * @return FileSplit with fragment metadata
+     */
+    public static FileSplit parseFragmentMetadata(InputData inputData) {
+        try {
+            byte[] serializedLocation = inputData.getFragmentMetadata();
+            if (serializedLocation == null) {
+                throw new IllegalArgumentException(
+                        "Missing fragment location information");
+            }
+
+            ByteArrayInputStream bytesStream = new ByteArrayInputStream(
+                    serializedLocation);
+            ObjectInputStream objectStream = new ObjectInputStream(bytesStream);
+
+            long start = objectStream.readLong();
+            long end = objectStream.readLong();
+
+            String[] hosts = (String[]) objectStream.readObject();
+
+            FileSplit fileSplit = new FileSplit(new Path(
+                    inputData.getDataSource()), start, end, hosts);
+
+            Log.debug("parsed file split: path " + inputData.getDataSource()
+                    + ", start " + start + ", end " + end + ", hosts "
+                    + ArrayUtils.toString(hosts));
+
+            return fileSplit;
+
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Exception while reading expected fragment metadata", e);
+        }
+    }
+
+    /**
+     * Accessing the Avro file through the "unsplittable" API just to get the
+     * schema. The splittable API (AvroInputFormat) which is the one we will be
+     * using to fetch the records, does not support getting the Avro schema yet.
+     *
+     * @param conf Hadoop configuration
+     * @param dataSource Avro file (i.e fileName.avro) path
+     * @return the Avro schema
+     * @throws IOException if I/O error occured while accessing Avro schema file
+     */
+    public static Schema getAvroSchema(Configuration conf, String dataSource)
+            throws IOException {
+        FsInput inStream = new FsInput(new Path(dataSource), conf);
+        DatumReader<GenericRecord> dummyReader = new GenericDatumReader<>();
+        DataFileReader<GenericRecord> dummyFileReader = new DataFileReader<>(
+                inStream, dummyReader);
+        Schema schema = dummyFileReader.getSchema();
+        dummyFileReader.close();
+        return schema;
+    }
+
+    /**
+     * Returns string serialization of list of fields. Fields of binary type
+     * (BYTEA) are converted to octal representation to make sure they will be
+     * relayed properly to the DB.
+     *
+     * @param complexRecord list of fields to be stringified
+     * @param delimiter delimiter between fields
+     * @return string of serialized fields using delimiter
+     */
+    public static String toString(List<OneField> complexRecord, String delimiter) {
+        StringBuilder buff = new StringBuilder();
+        String delim = ""; // first iteration has no delimiter
+        for (OneField complex : complexRecord) {
+            if (complex.type == DataType.BYTEA.getOID()) {
+                /** Serialize byte array as string */
+                buff.append(delim);
+                Utilities.byteArrayToOctalString((byte[]) complex.val, buff);
+            } else {
+                buff.append(delim).append(complex.val);
+            }
+            delim = delimiter;
+        }
+        return buff.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java
new file mode 100644
index 0000000..047a5c2
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/PxfInputFormat.java
@@ -0,0 +1,33 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+
+/**
+ * PxfInputFormat is not intended to read a specific format, hence it implements
+ * a dummy getRecordReader Instead, its purpose is to apply
+ * FileInputFormat.getSplits from one point in PXF and get the splits which are
+ * valid for the actual InputFormats, since all of them we use inherit
+ * FileInputFormat but do not override getSplits.
+ */
+public class PxfInputFormat extends FileInputFormat {
+
+    @Override
+    public RecordReader getRecordReader(InputSplit split,
+                                        JobConf conf,
+                                        Reporter reporter) throws IOException {
+        throw new UnsupportedOperationException("PxfInputFormat should not be used for reading data, but only for obtaining the splits of a file");
+    }
+
+    /*
+     * Return true if this file can be split.
+     */
+    @Override
+    protected boolean isSplitable(FileSystem fs, Path filename) {
+        return HdfsUtilities.isSplittableCodec(filename);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
new file mode 100644
index 0000000..3529932
--- /dev/null
+++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java
@@ -0,0 +1,265 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.*;
+
+import java.util.List;
+
+/**
+ * Adapter used for adding a recordkey field to the records output {@code List<OneField>}.
+ */
+public class RecordkeyAdapter {
+    private Log Log;
+
+    /*
+     * We need to transform Record keys to java primitive types.
+     * Since the type of the key is the same throughout the file we do the type resolution
+     * in the first call (for the first record) and then use a "Java variation on Function pointer"
+     * to do the extraction for the rest of the records.
+     */
+    private interface ValExtractor {
+        public Object get(Object key);
+    }
+
+    private ValExtractor extractor = null;
+
+    private interface ValConverter {
+        public Writable get(Object key);
+    }
+
+    private ValConverter converter = null;
+
+    /**
+     * Constructs a RecordkeyAdapter.
+     */
+    public RecordkeyAdapter() {
+        Log = LogFactory.getLog(RecordkeyAdapter.class);
+    }
+
+    /**
+     *  Adds the recordkey to the end of the passed in recFields list.
+     *  <p>
+     *  This method also verifies cases in which record keys are not supported
+     *  by the underlying source type, and therefore "illegally" requested.
+     *
+     * @param recFields existing list of record (non-key) fields and their values.
+     * @param input all input parameters coming from the client request
+     * @param onerow a row object which is used here in order to find out if
+     *        the given type supports recordkeys or not.
+     * @return 0 if record key not needed, or 1 if record key was appended
+     * @throws NoSuchFieldException when the given record type does not support
+     *         recordkeys
+     */
+    public int appendRecordkeyField(List<OneField> recFields,
+                                    InputData input,
+                                    OneRow onerow) throws NoSuchFieldException {
+
+		/*
+		 * user did not request the recordkey field in the
+		 * "create external table" statement
+		 */
+        ColumnDescriptor recordkeyColumn = input.getRecordkeyColumn();
+        if (recordkeyColumn == null) {
+            return 0;
+        }
+
+		/*
+		 * The recordkey was filled in the fileAccessor during execution of
+		 * method readNextObject. The current accessor implementations are
+		 * SequenceFileAccessor, LineBreakAccessor and AvroFileAccessor from
+		 * HdfsSplittableDataAccessor and QuotedLineBreakAccessor from
+		 * HdfsAtomicDataAccessor. For SequenceFileAccessor, LineBreakAccessor
+		 * the recordkey is set, since it is returned by the
+		 * SequenceFileRecordReader or LineRecordReader(for text file). But Avro
+		 * files do not have keys, so the AvroRecordReader will not return a key
+		 * and in this case recordkey will be null. If the user specified a
+		 * recordkey attribute in the CREATE EXTERNAL TABLE statement and he
+		 * reads from an AvroFile, we will throw an exception since the Avro
+		 * file does not have keys In the future, additional implementations of
+		 * FileAccessors will have to set recordkey during readNextObject().
+		 * Otherwise it is null by default and we will throw an exception here,
+		 * that is if we get here... a careful user will not specify recordkey
+		 * in the CREATE EXTERNAL statement and then we will leave this function
+		 * one line above.
+		 */
+        Object recordkey = onerow.getKey();
+        if (recordkey == null) {
+            throw new NoSuchFieldException("Value for field \"recordkey\" was requested but the queried HDFS resource type does not support key");
+        }
+
+        OneField oneField = new OneField();
+        oneField.type = recordkeyColumn.columnTypeCode();
+        oneField.val = extractVal(recordkey);
+        recFields.add(oneField);
+        return 1;
+    }
+
+    /*
+	 * Extracts a java primitive type value from the recordkey. If the key is a
+	 * Writable implementation we extract the value as a Java primitive. If the
+	 * key is already a Java primitive we returned it as is If it is an unknown
+	 * type we throw an exception
+	 */
+    private Object extractVal(Object key) {
+        if (extractor == null) {
+            extractor = InitializeExtractor(key);
+        }
+
+        return extractor.get(key);
+    }
+
+    /*
+     * Initialize the extractor object based on the type of the recordkey
+     */
+    private ValExtractor InitializeExtractor(Object key) {
+        if (key instanceof IntWritable) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return ((IntWritable) key).get();
+                }
+            };
+        } else if (key instanceof ByteWritable) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return ((ByteWritable) key).get();
+                }
+            };
+        } else if (key instanceof BooleanWritable) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return ((BooleanWritable) key).get();
+                }
+            };
+        } else if (key instanceof DoubleWritable) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return ((DoubleWritable) key).get();
+                }
+            };
+        } else if (key instanceof FloatWritable) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return ((FloatWritable) key).get();
+                }
+            };
+        } else if (key instanceof LongWritable) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return ((LongWritable) key).get();
+                }
+            };
+        } else if (key instanceof Text) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return (key).toString();
+                }
+            };
+        } else if (key instanceof VIntWritable) {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    return ((VIntWritable) key).get();
+                }
+            };
+        } else {
+            return new ValExtractor() {
+                @Override
+                public Object get(Object key) {
+                    throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
+                }
+            };
+        }
+    }
+
+    /**
+     * Converts given key object to its matching Writable.
+     * Supported types: Integer, Byte, Boolean, Double, Float, Long, String.
+     * The type is only checked once based on the key, all consequent calls
+     * must be of the same type.
+     *
+     * @param key object to convert
+     * @return Writable object matching given key
+     */
+    public Writable convertKeyValue(Object key) {
+        if (converter == null) {
+            converter = initializeConverter(key);
+            Log.debug("converter initialized for type " + key.getClass() +
+                    " (key value: " + key + ")");
+        }
+
+        return converter.get(key);
+    }
+
+    private ValConverter initializeConverter(Object key) {
+
+        if (key instanceof Integer) {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    return (new IntWritable((Integer) key));
+                }
+            };
+        } else if (key instanceof Byte) {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    return (new ByteWritable((Byte) key));
+                }
+            };
+        } else if (key instanceof Boolean) {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    return (new BooleanWritable((Boolean) key));
+                }
+            };
+        } else if (key instanceof Double) {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    return (new DoubleWritable((Double) key));
+                }
+            };
+        } else if (key instanceof Float) {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    return (new FloatWritable((Float) key));
+                }
+            };
+        } else if (key instanceof Long) {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    return (new LongWritable((Long) key));
+                }
+            };
+        } else if (key instanceof String) {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    return (new Text((String) key));
+                }
+            };
+        } else {
+            return new ValConverter() {
+                @Override
+                public Writable get(Object key) {
+                    throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName());
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java
deleted file mode 100644
index b34e6c5..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/ChunkReaderTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hdfs.DFSInputStream;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.stubbing.*;
-import org.mockito.invocation.*;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.mockito.Matchers.any;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tester for the ChunkReader class
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ChunkReader.class})
-public class ChunkReaderTest {
-	
-	ChunkReader reader;
-	/* Mocking the stream class that accesses the actual data */
-	DFSInputStream mockStream;
-
-    /*
-     * setUp function called before each test.
-	 */
-    @Before
-    public void setUp() throws Exception {
-		mockStream = mock(DFSInputStream.class); 	
-    }
-
-    /*
-	 * Simulate the empty file case
-	 */
-    @Test
-    public void readEmptyFile() throws Exception {
-		reader = new ChunkReader(mockStream);
-		when( mockStream.read( (byte [])Mockito.anyObject()) ).thenReturn(0);
-		
-		Writable out = new ChunkWritable();
-		int maxBytesToConsume = 1024*1024;
-		assertEquals(0, reader.readLine(out, maxBytesToConsume));
-    }
-
-	/*
-	 * Read one line
-	 */
-    @Test
-    public void readOneLine() throws Exception {
-		reader = new ChunkReader(mockStream);
-		when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
-			@Override
-			public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
-				byte[] buf = (byte[]) invocation.getArguments()[0];
-				
-				byte [] source = "OneLine\nTwoLine\n".getBytes();
-				System.arraycopy(source, 0, buf, 0, source.length);
-				return new java.lang.Byte(buf[0]);
-			}
-		});
-		
-		ChunkWritable out = new ChunkWritable();
-		int maxBytesToConsume = 1024*1024;
-		// read first line
-		assertEquals("OneLine\n".length()
-					 , reader.readLine(out, maxBytesToConsume) );
-		assertEquals("OneLine\n", new String(out.box) );
-
-		// read second line
-		assertEquals("TwoLine\n".length(), reader.readLine(out, maxBytesToConsume) );
-		assertEquals("TwoLine\n", new String(out.box) );
-    }
-	
-	/*
-	 * Read one line
-	 */
-    @Test
-    public void readChunk() throws Exception {
-		reader = new ChunkReader(mockStream);
-		when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
-			@Override
-			public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
-				byte[] buf = (byte[]) invocation.getArguments()[0];
-				
-				byte [] source = "OneLine\nTwoLine\n".getBytes();
-				System.arraycopy(source, 0, buf, 0, source.length);
-				return new java.lang.Integer(source.length);
-			}
-		});
-		
-		ChunkWritable out = new ChunkWritable();
-		int maxBytesToConsume = 10; /* make readChunk return after reading the first "chunk": OneLine\nTwoLine\n */
-		// read chunk
-		assertEquals("OneLine\nTwoLine\n".length()
-					 , reader.readChunk(out, maxBytesToConsume) );
-		assertEquals("OneLine\nTwoLine\n", new String(out.box) );
-    }	
-	
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java
deleted file mode 100644
index 016f0ed..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessorTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.servlet.ServletContext;
-
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.mockito.Matchers.any;
-import static org.junit.Assert.*;
-
-import com.pivotal.pxf.api.utilities.InputData;
-import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities;
-
-import static org.mockito.Mockito.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SequenceFileAccessor.class, HdfsSplittableDataAccessor.class, HdfsUtilities.class})
-@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf","org.apache.hadoop.fs.FileContext"}) // Prevents static inits
-public class SequenceFileAccessorTest {
-
-    InputData inputData;
-    SequenceFileAccessor accessor;
-    Map<String, String> parameters;
-    ServletContext mockContext;
-    Configuration hdfsConfiguration;
-    SequenceFileInputFormat<?,?> inFormat;
-    JobConf jobConf = null;
-    Path file;
-    FileSystem fs;
-    FileContext fc;
-    Log mockLog;
-
-    /*
-     * setUp function called before each test.
-     * 
-     * As the focus of the test is compression codec and type behavior, and
-     * since the compression methods are private to SequenceFileAccessor, we
-     * test their invocation and results by calling the public openForWrite().
-     * Since openForWrite does some filesystem operations on HDFS, those had
-     * to be mocked (and provided good material for a new Kafka story).
-	 */
-    @Before
-    public void setUp() throws Exception {
-    	
-        mockContext = mock(ServletContext.class);
-        inFormat = mock(SequenceFileInputFormat.class);
-        hdfsConfiguration = mock(Configuration.class);
-        jobConf = mock(JobConf.class);
-        file = mock(Path.class);
-        fs = mock(FileSystem.class);
-        fc = mock(FileContext.class);
-        inputData = mock(InputData.class);
-
-    	PowerMockito.mockStatic(FileContext.class);
-    	PowerMockito.mockStatic(HdfsUtilities.class);
-		PowerMockito.whenNew(Path.class).withArguments(Mockito.anyString()).thenReturn(file);
-
-        when(file.getFileSystem(any(Configuration.class))).thenReturn(fs);
-        when(inputData.getDataSource()).thenReturn("deep.throat");
-        when(inputData.getSegmentId()).thenReturn(0);             	
-        when(FileContext.getFileContext()).thenReturn(fc);
-    }
-
-    /*
-	 * After each test is done, close the accessor if it was created
-	 */
-    @After
-    public void tearDown() throws Exception {
-    	
-        if (accessor == null) {
-            return;
-        }
-
-        accessor.closeForWrite();
-        accessor = null;
-    }
-
-    private void constructAccessor() throws Exception {
-            	
-        accessor = new SequenceFileAccessor(inputData);
-        accessor.openForWrite();
-    }
-    
-    private void mockCompressionOptions(String codec, String type)
-    {
-        when(inputData.getUserProperty("COMPRESSION_CODEC")).thenReturn(codec);
-        when(inputData.getUserProperty("COMPRESSION_TYPE")).thenReturn(type);
-    }
-    
-    @Test
-    public void compressionNotSpecified() throws Exception {
-
-    	mockCompressionOptions(null, null);
-        constructAccessor();
-        assertEquals(SequenceFile.CompressionType.NONE, accessor.getCompressionType());
-        assertEquals(null, accessor.getCodec());
-    }
-
-	@Test
-	public void compressCodec() throws Exception {
-
-		//using BZip2 as a valid example
-        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
-        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
-        constructAccessor();				
-		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
-	}
-
-    @Test
-    public void bogusCompressCodec() {
-
-    	final String codecName = "So I asked, who is he? He goes by the name of Wayne Rooney";
-        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenThrow(new IllegalArgumentException("Compression codec " + codecName + " was not found."));
-        mockCompressionOptions(codecName, null);
-        
-        try {
-        	constructAccessor();
-            fail("should throw no codec found exception");
-        } catch (Exception e) {
-            assertEquals("Compression codec " + codecName + " was not found.", e.getMessage());
-        }
-    }
-
-	@Test
-	public void compressTypes() throws Exception {
-
-        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
-        
-        //proper value
-        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "BLOCK");
-        constructAccessor();				
-		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
-		assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.BLOCK, accessor.getCompressionType());
-
-		//case (non) sensitivity
-        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "ReCoRd");
-        constructAccessor();				
-		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
-		assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
-
-		//default (RECORD)
-        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
-        constructAccessor();				
-		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
-		assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
-	}
-
-    @Test
-	public void illegalCompressTypes() throws Exception {
-
-        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
-        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "Oy");
-        
-		try {
-	        constructAccessor();				
-			fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
-		} catch (IllegalArgumentException e) {
-			assertEquals("Illegal compression type 'Oy'", e.getMessage());
-		}
-		
-        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "NONE");
-        
-		try {
-	        constructAccessor();				
-			fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
-		} catch (IllegalArgumentException e) {
-			assertEquals("Illegal compression type 'NONE'. For disabling compression remove COMPRESSION_CODEC parameter.", e.getMessage());
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java
deleted file mode 100644
index 1191274..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/StringPassResolverTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs;
-
-import com.pivotal.pxf.api.OneField;
-import com.pivotal.pxf.api.OneRow;
-import com.pivotal.pxf.api.OutputFormat;
-import com.pivotal.pxf.service.BridgeInputBuilder;
-import com.pivotal.pxf.service.io.Text;
-import com.pivotal.pxf.service.utilities.ProtocolData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Text.class, BridgeInputBuilder.class, ProtocolData.class, LogFactory.class})
-public class StringPassResolverTest {
-    ProtocolData mockProtocolData;
-    Log mockLog;
-    
-    @Test
-    /*
-     * Test the setFields method: small \n terminated input
-	 */
-    public void testSetFields() throws Exception {
-        StringPassResolver resolver = buildResolver();
-
-        byte[] data = new byte[]{(int) 'a', (int) 'b', (int) 'c', (int) 'd', (int) '\n',
-                (int) 'n', (int) 'o', (int) '\n'};
-
-        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(data));
-        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
-        List<OneField> record = inputBuilder.makeInput(inputStream);
-
-        OneRow oneRow = resolver.setFields(record);
-        verifyOneRow(oneRow, Arrays.copyOfRange(data, 0, 5));
-
-        record = inputBuilder.makeInput(inputStream);
-        oneRow = resolver.setFields(record);
-        verifyOneRow(oneRow, Arrays.copyOfRange(data, 5, 8));
-    }
-
-    @Test
-    /*
-     * Test the setFields method: input > buffer size, \n terminated
-	 */
-    public void testSetFieldsBigArray() throws Exception {
-
-        StringPassResolver resolver = buildResolver();
-
-        byte[] bigArray = new byte[2000];
-        for (int i = 0; i < 1999; ++i) {
-            bigArray[i] = (byte) (i % 10 + 30);
-        }
-        bigArray[1999] = (byte) '\n';
-
-        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
-        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
-        List<OneField> record = inputBuilder.makeInput(inputStream);
-
-        OneRow oneRow = resolver.setFields(record);
-
-        verifyOneRow(oneRow, bigArray);
-    }
-
-    @Test
-    /*
-     * Test the setFields method: input > buffer size, no \n
-	 */
-    public void testSetFieldsBigArrayNoNewLine() throws Exception {
-
-    	PowerMockito.mockStatic(LogFactory.class);
-        mockLog = mock(Log.class);
-        PowerMockito.when(LogFactory.getLog(any(Class.class))).thenReturn(mockLog);
-
-    	StringPassResolver resolver = buildResolver();
-
-        byte[] bigArray = new byte[2000];
-        for (int i = 0; i < 2000; ++i) {
-            bigArray[i] = (byte) (i % 10 + 60);
-        }
-
-        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
-        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
-        List<OneField> record = inputBuilder.makeInput(inputStream);
-
-        OneRow oneRow = resolver.setFields(record);
-
-        verifyOneRow(oneRow, bigArray);
-
-        //verify(mockLog, atLeastOnce()).info(anyString());
-        //Mockito.verify(mockLog).warn("Stream ended without line breaksdfljsldkj");
-        //verifyWarning();
-    }
-
-    @Test
-    /*
-	 * Test the setFields method: empty stream (returns -1)
-	 */
-    public void testSetFieldsEmptyStream() throws Exception {
-
-        StringPassResolver resolver = buildResolver();
-
-        byte[] empty = new byte[0];
-
-        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(empty));
-        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
-        List<OneField> record = inputBuilder.makeInput(inputStream);
-
-        OneRow oneRow = resolver.setFields(record);
-
-        assertNull(oneRow);
-    }
-	
-	/*
-	 * helpers functions
-	 */
-    private StringPassResolver buildResolver()
-            throws Exception {
- 
-        mockProtocolData = mock(ProtocolData.class);
-        PowerMockito.when(mockProtocolData.outputFormat()).thenReturn(OutputFormat.TEXT);
-
-        return new StringPassResolver(mockProtocolData);
-    }
-
-    private void verifyOneRow(OneRow oneRow, byte[] expected) {
-        assertNull(oneRow.getKey());
-        byte[] bytes = (byte[]) oneRow.getData();
-        byte[] result = Arrays.copyOfRange(bytes, 0, bytes.length);
-        assertEquals(result.length, expected.length);
-        assertTrue(Arrays.equals(result, expected));
-    }
-
-//    private void verifyWarning() {
-//        Mockito.verify(Log).warn("Stream ended without line break");
-//    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
deleted file mode 100644
index 83af8ba..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import com.pivotal.pxf.api.OneField;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@SuppressStaticInitializationFor("com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities")
-@PrepareForTest({HdfsUtilities.class, ReflectionUtils.class})
-public class HdfsUtilitiesTest {
-
-    Configuration conf;
-    CompressionCodecFactory factory;
-    Log Log;
-
-    @Before
-    public void SetupCompressionFactory() {
-        factory = mock(CompressionCodecFactory.class);
-        Whitebox.setInternalState(HdfsUtilities.class, factory);
-        Log = mock(Log.class);
-        Whitebox.setInternalState(HdfsUtilities.class, Log);
-    }
-
-    @Test
-    public void getCodecNoName() {
-
-        Configuration conf = new Configuration();
-        String name = "some.bad.codec";
-
-        try {
-            HdfsUtilities.getCodec(conf, name);
-            fail("function should fail with bad codec name " + name);
-        } catch (IllegalArgumentException e) {
-            assertEquals(e.getMessage(), "Compression codec " + name + " was not found.");
-        }
-    }
-
-    @Test
-    public void getCodecNoConf() {
-
-        Configuration conf = null;
-        String name = "org.apache.hadoop.io.compress.GzipCodec";
-
-        try {
-            HdfsUtilities.getCodec(conf, name);
-            fail("function should fail with when conf is null");
-        } catch (NullPointerException e) {
-            assertTrue(true);
-        }
-    }
-
-    @Test
-    public void getCodecGzip() {
-
-        Configuration conf = new Configuration();
-        String name = "org.apache.hadoop.io.compress.GzipCodec";
-
-        PowerMockito.mockStatic(ReflectionUtils.class);
-        GzipCodec gzipCodec = mock(GzipCodec.class);
-
-        when(ReflectionUtils.newInstance(GzipCodec.class, conf)).thenReturn(gzipCodec);
-
-        CompressionCodec codec = HdfsUtilities.getCodec(conf, name);
-        assertNotNull(codec);
-        assertEquals(codec, gzipCodec);
-    }
-
-    @Test
-    public void isThreadSafe() {
-
-        testIsThreadSafe(
-                "readable compression, no compression - thread safe",
-                "/some/path/without.compression",
-                null, null,
-                true);
-
-        testIsThreadSafe(
-                "readable compression, gzip compression - thread safe",
-                "/some/compressed/path.gz",
-                null, new GzipCodec(),
-                true);
-
-        testIsThreadSafe(
-                "readable compression, bzip2 compression - not thread safe",
-                "/some/path/with/bzip2.bz2",
-                null, new BZip2Codec(),
-                false);
-
-        testIsThreadSafe(
-                "writable compression, no compression codec - thread safe",
-                "/some/path",
-                null, null,
-                true);
-
-        testIsThreadSafe(
-                "writable compression, some compression codec - thread safe",
-                "/some/path",
-                "I.am.a.nice.codec", new NotSoNiceCodec(),
-                true);
-
-        testIsThreadSafe(
-                "writable compression, compression codec bzip2 - not thread safe",
-                "/some/path",
-                "org.apache.hadoop.io.compress.BZip2Codec", new BZip2Codec(),
-                false);
-    }
-
-    private void testIsThreadSafe(String testDescription, String path, String codecStr, CompressionCodec codec, boolean expectedResult) {
-        prepareDataForIsThreadSafe(path, codecStr, codec);
-
-        boolean result = HdfsUtilities.isThreadSafe(path, codecStr);
-        assertTrue(testDescription, result == expectedResult);
-    }
-
-    private void prepareDataForIsThreadSafe(String dataDir, String codecStr, CompressionCodec codec) {
-        try {
-            conf = PowerMockito.mock(Configuration.class);
-            PowerMockito.whenNew(Configuration.class).withNoArguments().thenReturn(conf);
-        } catch (Exception e) {
-            fail("new Configuration mocking failed");
-        }
-
-        if (codecStr == null) {
-            when(factory.getCodec(new Path(dataDir))).thenReturn(codec);
-        } else {
-            PowerMockito.stub(PowerMockito.method(HdfsUtilities.class, "getCodecClass")).toReturn(codec.getClass());
-        }
-    }
-
-    @Test
-    public void isSplittableCodec() {
-
-        testIsSplittableCodec("no codec - splittable",
-                "some/innocent.file", null, true);
-        testIsSplittableCodec("gzip codec - not splittable",
-                "/gzip.gz", new GzipCodec(), false);
-        testIsSplittableCodec("default codec - not splittable",
-                "/default.deflate", new DefaultCodec(), false);
-        testIsSplittableCodec("bzip2 codec - splittable",
-                "bzip2.bz2", new BZip2Codec(), true);
-    }
-
-    private void testIsSplittableCodec(String description,
-                                       String pathName, CompressionCodec codec, boolean expected) {
-        Path path = new Path(pathName);
-        when(factory.getCodec(path)).thenReturn(codec);
-
-        boolean result = HdfsUtilities.isSplittableCodec(path);
-        assertEquals(description, result, expected);
-    }
-
-    @Test
-    public void testToString() {
-        List<OneField> oneFields = Arrays.asList(new OneField(1, "uno"), new OneField(2, "dos"), new OneField(3, "tres"));
-
-        assertEquals("uno!dos!tres", HdfsUtilities.toString(oneFields, "!"));
-
-        assertEquals("uno", HdfsUtilities.toString(Collections.singletonList(oneFields.get(0)), "!"));
-
-        assertEquals("", HdfsUtilities.toString(Collections.<OneField>emptyList(), "!"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
deleted file mode 100644
index 4f5b18b..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import org.apache.hadoop.io.compress.*;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Codec class for UtilitiesTest
- * Can't be embedded inside UtilitiesTest due to junit limitation.
- */
-public class NotSoNiceCodec implements CompressionCodec {
-
-    @Override
-    public CompressionOutputStream createOutputStream(OutputStream out)
-            throws IOException {
-        return null;
-    }
-
-    @Override
-    public CompressionOutputStream createOutputStream(OutputStream out,
-                                                      Compressor compressor) throws IOException {
-        return null;
-    }
-
-    @Override
-    public Class<? extends Compressor> getCompressorType() {
-        return null;
-    }
-
-    @Override
-    public Compressor createCompressor() {
-        return null;
-    }
-
-    @Override
-    public CompressionInputStream createInputStream(InputStream in)
-            throws IOException {
-        return null;
-    }
-
-    @Override
-    public CompressionInputStream createInputStream(InputStream in,
-                                                    Decompressor decompressor) throws IOException {
-        return null;
-    }
-
-    @Override
-    public Class<? extends Decompressor> getDecompressorType() {
-        return null;
-    }
-
-    @Override
-    public Decompressor createDecompressor() {
-        return null;
-    }
-
-    @Override
-    public String getDefaultExtension() {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java b/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
deleted file mode 100644
index b0c9a3a..0000000
--- a/pxf/pxf-hdfs/src/test/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package com.pivotal.pxf.plugins.hdfs.utilities;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.*;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({RecordkeyAdapter.class, LogFactory.class})
-public class RecordkeyAdapterTest {
-    Log Log;
-    RecordkeyAdapter recordkeyAdapter;
-
-    /**
-     * Test convertKeyValue for Integer type
-     */
-    @Test
-    public void convertKeyValueInteger() {
-        int key = 13;
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new IntWritable(key));
-    }
-
-    /**
-     * Test convertKeyValue for Boolean type
-     */
-    @Test
-    public void convertKeyValueBoolean() {
-        boolean key = true;
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new BooleanWritable(key));
-    }
-
-    /**
-     * Test convertKeyValue for Byte type
-     */
-    @Test
-    public void convertKeyValueByte() {
-        byte key = 1;
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new ByteWritable(key));
-    }
-
-    /**
-     * Test convertKeyValue for Double type
-     */
-    @Test
-    public void convertKeyValueDouble() {
-        double key = 2.3;
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new DoubleWritable(key));
-    }
-
-    /**
-     * Test convertKeyValue for Float type
-     */
-    @Test
-    public void convertKeyValueFloat() {
-        float key = (float) 2.3;
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new FloatWritable(key));
-    }
-
-    /**
-     * Test convertKeyValue for Long type
-     */
-    @Test
-    public void convertKeyValueLong() {
-        long key = 12345678901234567l;
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new LongWritable(key));
-    }
-
-    /**
-     * Test convertKeyValue for String type
-     */
-    @Test
-    public void convertKeyValueString() {
-        String key = "key";
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new Text(key));
-    }
-
-    /**
-     * Test convertKeyValue for several calls of the same type
-     */
-    @Test
-    public void convertKeyValueManyCalls() {
-        Boolean key = true;
-        mockLog();
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new BooleanWritable(key));
-        verifyLog("converter initialized for type " + key.getClass() +
-                " (key value: " + key + ")");
-
-        for (int i = 0; i < 5; ++i) {
-            key = (i % 2) == 0;
-            runConvertKeyValue(key, new BooleanWritable(key));
-        }
-        verifyLogOnlyOnce();
-    }
-
-    /**
-     * Test convertKeyValue for boolean type and then string type - negative
-     * test
-     */
-    @Test
-    public void convertKeyValueBadSecondValue() {
-        boolean key = true;
-        initRecordkeyAdapter();
-        runConvertKeyValue(key, new BooleanWritable(key));
-        String badKey = "bad";
-        try {
-            recordkeyAdapter.convertKeyValue(badKey);
-            fail("conversion of string to boolean should fail");
-        } catch (ClassCastException e) {
-            assertEquals(e.getMessage(),
-                    "java.lang.String cannot be cast to java.lang.Boolean");
-        }
-    }
-
-    private void initRecordkeyAdapter() {
-        recordkeyAdapter = new RecordkeyAdapter();
-    }
-
-    private void runConvertKeyValue(Object key, Writable expected) {
-        Writable writable = recordkeyAdapter.convertKeyValue(key);
-        assertEquals(writable, expected);
-    }
-
-    private void mockLog() {
-        PowerMockito.mockStatic(LogFactory.class);
-        Log = mock(Log.class);
-        when(LogFactory.getLog(RecordkeyAdapter.class)).thenReturn(Log);
-    }
-
-    private void verifyLog(String msg) {
-        Mockito.verify(Log).debug(msg);
-    }
-
-    private void verifyLogOnlyOnce() {
-        Mockito.verify(Log, Mockito.times(1)).debug(Mockito.any());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java
new file mode 100644
index 0000000..6b30e68
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/ChunkReaderTest.java
@@ -0,0 +1,119 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hdfs.DFSInputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.stubbing.*;
+import org.mockito.invocation.*;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tester for the ChunkReader class
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ChunkReader.class})
+public class ChunkReaderTest {
+	
+	ChunkReader reader;
+	/* Mocking the stream class that accesses the actual data */
+	DFSInputStream mockStream;
+
+    /*
+     * setUp function called before each test.
+	 */
+    @Before
+    public void setUp() throws Exception {
+		mockStream = mock(DFSInputStream.class); 	
+    }
+
+    /*
+	 * Simulate the empty file case
+	 */
+    @Test
+    public void readEmptyFile() throws Exception {
+		reader = new ChunkReader(mockStream);
+		when( mockStream.read( (byte [])Mockito.anyObject()) ).thenReturn(0);
+		
+		Writable out = new ChunkWritable();
+		int maxBytesToConsume = 1024*1024;
+		assertEquals(0, reader.readLine(out, maxBytesToConsume));
+    }
+
+	/*
+	 * Read one line
+	 */
+    @Test
+    public void readOneLine() throws Exception {
+		reader = new ChunkReader(mockStream);
+		when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
+			@Override
+			public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
+				byte[] buf = (byte[]) invocation.getArguments()[0];
+				
+				byte [] source = "OneLine\nTwoLine\n".getBytes();
+				System.arraycopy(source, 0, buf, 0, source.length);
+				return new java.lang.Byte(buf[0]);
+			}
+		});
+		
+		ChunkWritable out = new ChunkWritable();
+		int maxBytesToConsume = 1024*1024;
+		// read first line
+		assertEquals("OneLine\n".length()
+					 , reader.readLine(out, maxBytesToConsume) );
+		assertEquals("OneLine\n", new String(out.box) );
+
+		// read second line
+		assertEquals("TwoLine\n".length(), reader.readLine(out, maxBytesToConsume) );
+		assertEquals("TwoLine\n", new String(out.box) );
+    }
+	
+	/*
+	 * Read one line
+	 */
+    @Test
+    public void readChunk() throws Exception {
+		reader = new ChunkReader(mockStream);
+		when( mockStream.read( (byte [])Mockito.anyObject()) ).thenAnswer(new Answer<java.lang.Number>() {
+			@Override
+			public java.lang.Number answer(InvocationOnMock invocation) throws Throwable {
+				byte[] buf = (byte[]) invocation.getArguments()[0];
+				
+				byte [] source = "OneLine\nTwoLine\n".getBytes();
+				System.arraycopy(source, 0, buf, 0, source.length);
+				return new java.lang.Integer(source.length);
+			}
+		});
+		
+		ChunkWritable out = new ChunkWritable();
+		int maxBytesToConsume = 10; /* make readChunk return after reading the first "chunk": OneLine\nTwoLine\n */
+		// read chunk
+		assertEquals("OneLine\nTwoLine\n".length()
+					 , reader.readChunk(out, maxBytesToConsume) );
+		assertEquals("OneLine\nTwoLine\n", new String(out.box) );
+    }	
+	
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java
new file mode 100644
index 0000000..d2e43ad
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/SequenceFileAccessorTest.java
@@ -0,0 +1,191 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.junit.Assert.*;
+
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
+
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SequenceFileAccessor.class, HdfsSplittableDataAccessor.class, HdfsUtilities.class})
+@SuppressStaticInitializationFor({"org.apache.hadoop.mapred.JobConf","org.apache.hadoop.fs.FileContext"}) // Prevents static inits
+public class SequenceFileAccessorTest {
+
+    InputData inputData;
+    SequenceFileAccessor accessor;
+    Map<String, String> parameters;
+    ServletContext mockContext;
+    Configuration hdfsConfiguration;
+    SequenceFileInputFormat<?,?> inFormat;
+    JobConf jobConf = null;
+    Path file;
+    FileSystem fs;
+    FileContext fc;
+    Log mockLog;
+
+    /*
+     * setUp function called before each test.
+     * 
+     * As the focus of the test is compression codec and type behavior, and
+     * since the compression methods are private to SequenceFileAccessor, we
+     * test their invocation and results by calling the public openForWrite().
+     * Since openForWrite does some filesystem operations on HDFS, those had
+     * to be mocked (and provided good material for a new Kafka story).
+	 */
+    @Before
+    public void setUp() throws Exception {
+    	
+        mockContext = mock(ServletContext.class);
+        inFormat = mock(SequenceFileInputFormat.class);
+        hdfsConfiguration = mock(Configuration.class);
+        jobConf = mock(JobConf.class);
+        file = mock(Path.class);
+        fs = mock(FileSystem.class);
+        fc = mock(FileContext.class);
+        inputData = mock(InputData.class);
+
+    	PowerMockito.mockStatic(FileContext.class);
+    	PowerMockito.mockStatic(HdfsUtilities.class);
+		PowerMockito.whenNew(Path.class).withArguments(Mockito.anyString()).thenReturn(file);
+
+        when(file.getFileSystem(any(Configuration.class))).thenReturn(fs);
+        when(inputData.getDataSource()).thenReturn("deep.throat");
+        when(inputData.getSegmentId()).thenReturn(0);             	
+        when(FileContext.getFileContext()).thenReturn(fc);
+    }
+
+    /*
+	 * After each test is done, close the accessor if it was created
+	 */
+    @After
+    public void tearDown() throws Exception {
+    	
+        if (accessor == null) {
+            return;
+        }
+
+        accessor.closeForWrite();
+        accessor = null;
+    }
+
+    private void constructAccessor() throws Exception {
+            	
+        accessor = new SequenceFileAccessor(inputData);
+        accessor.openForWrite();
+    }
+    
+    private void mockCompressionOptions(String codec, String type)
+    {
+        when(inputData.getUserProperty("COMPRESSION_CODEC")).thenReturn(codec);
+        when(inputData.getUserProperty("COMPRESSION_TYPE")).thenReturn(type);
+    }
+    
+    @Test
+    public void compressionNotSpecified() throws Exception {
+
+    	mockCompressionOptions(null, null);
+        constructAccessor();
+        assertEquals(SequenceFile.CompressionType.NONE, accessor.getCompressionType());
+        assertEquals(null, accessor.getCodec());
+    }
+
+	@Test
+	public void compressCodec() throws Exception {
+
+		//using BZip2 as a valid example
+        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
+        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
+        constructAccessor();				
+		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+	}
+
+    @Test
+    public void bogusCompressCodec() {
+
+    	final String codecName = "So I asked, who is he? He goes by the name of Wayne Rooney";
+        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenThrow(new IllegalArgumentException("Compression codec " + codecName + " was not found."));
+        mockCompressionOptions(codecName, null);
+        
+        try {
+        	constructAccessor();
+            fail("should throw no codec found exception");
+        } catch (Exception e) {
+            assertEquals("Compression codec " + codecName + " was not found.", e.getMessage());
+        }
+    }
+
+	@Test
+	public void compressTypes() throws Exception {
+
+        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
+        
+        //proper value
+        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "BLOCK");
+        constructAccessor();				
+		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+		assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.BLOCK, accessor.getCompressionType());
+
+		//case (non) sensitivity
+        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "ReCoRd");
+        constructAccessor();				
+		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+		assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
+
+		//default (RECORD)
+        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", null);
+        constructAccessor();				
+		assertEquals(".bz2", accessor.getCodec().getDefaultExtension());
+		assertEquals(org.apache.hadoop.io.SequenceFile.CompressionType.RECORD, accessor.getCompressionType());
+	}
+
+    @Test
+	public void illegalCompressTypes() throws Exception {
+
+        when(HdfsUtilities.getCodec((Configuration)Mockito.anyObject(), Mockito.anyString())).thenReturn(new BZip2Codec());
+        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "Oy");
+        
+		try {
+	        constructAccessor();				
+			fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
+		} catch (IllegalArgumentException e) {
+			assertEquals("Illegal compression type 'Oy'", e.getMessage());
+		}
+		
+        mockCompressionOptions("org.apache.hadoop.io.compress.BZip2Codec", "NONE");
+        
+		try {
+	        constructAccessor();				
+			fail("illegal COMPRESSION_TYPE should throw IllegalArgumentException");
+		} catch (IllegalArgumentException e) {
+			assertEquals("Illegal compression type 'NONE'. For disabling compression remove COMPRESSION_CODEC parameter.", e.getMessage());
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java
new file mode 100644
index 0000000..af713c7
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/StringPassResolverTest.java
@@ -0,0 +1,149 @@
+package org.apache.hawq.pxf.plugins.hdfs;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.OutputFormat;
+import org.apache.hawq.pxf.service.BridgeInputBuilder;
+import org.apache.hawq.pxf.service.io.Text;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Text.class, BridgeInputBuilder.class, ProtocolData.class, LogFactory.class})
+public class StringPassResolverTest {
+    ProtocolData mockProtocolData;
+    Log mockLog;
+    
+    @Test
+    /*
+     * Test the setFields method: small \n terminated input
+	 */
+    public void testSetFields() throws Exception {
+        StringPassResolver resolver = buildResolver();
+
+        byte[] data = new byte[]{(int) 'a', (int) 'b', (int) 'c', (int) 'd', (int) '\n',
+                (int) 'n', (int) 'o', (int) '\n'};
+
+        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(data));
+        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+        List<OneField> record = inputBuilder.makeInput(inputStream);
+
+        OneRow oneRow = resolver.setFields(record);
+        verifyOneRow(oneRow, Arrays.copyOfRange(data, 0, 5));
+
+        record = inputBuilder.makeInput(inputStream);
+        oneRow = resolver.setFields(record);
+        verifyOneRow(oneRow, Arrays.copyOfRange(data, 5, 8));
+    }
+
+    @Test
+    /*
+     * Test the setFields method: input > buffer size, \n terminated
+	 */
+    public void testSetFieldsBigArray() throws Exception {
+
+        StringPassResolver resolver = buildResolver();
+
+        byte[] bigArray = new byte[2000];
+        for (int i = 0; i < 1999; ++i) {
+            bigArray[i] = (byte) (i % 10 + 30);
+        }
+        bigArray[1999] = (byte) '\n';
+
+        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
+        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+        List<OneField> record = inputBuilder.makeInput(inputStream);
+
+        OneRow oneRow = resolver.setFields(record);
+
+        verifyOneRow(oneRow, bigArray);
+    }
+
+    @Test
+    /*
+     * Test the setFields method: input > buffer size, no \n
+	 */
+    public void testSetFieldsBigArrayNoNewLine() throws Exception {
+
+    	PowerMockito.mockStatic(LogFactory.class);
+        mockLog = mock(Log.class);
+        PowerMockito.when(LogFactory.getLog(any(Class.class))).thenReturn(mockLog);
+
+    	StringPassResolver resolver = buildResolver();
+
+        byte[] bigArray = new byte[2000];
+        for (int i = 0; i < 2000; ++i) {
+            bigArray[i] = (byte) (i % 10 + 60);
+        }
+
+        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(bigArray));
+        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+        List<OneField> record = inputBuilder.makeInput(inputStream);
+
+        OneRow oneRow = resolver.setFields(record);
+
+        verifyOneRow(oneRow, bigArray);
+
+        //verify(mockLog, atLeastOnce()).info(anyString());
+        //Mockito.verify(mockLog).warn("Stream ended without line breaksdfljsldkj");
+        //verifyWarning();
+    }
+
+    @Test
+    /*
+	 * Test the setFields method: empty stream (returns -1)
+	 */
+    public void testSetFieldsEmptyStream() throws Exception {
+
+        StringPassResolver resolver = buildResolver();
+
+        byte[] empty = new byte[0];
+
+        DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(empty));
+        BridgeInputBuilder inputBuilder = new BridgeInputBuilder(mockProtocolData);
+        List<OneField> record = inputBuilder.makeInput(inputStream);
+
+        OneRow oneRow = resolver.setFields(record);
+
+        assertNull(oneRow);
+    }
+	
+	/*
+	 * helpers functions
+	 */
+    private StringPassResolver buildResolver()
+            throws Exception {
+ 
+        mockProtocolData = mock(ProtocolData.class);
+        PowerMockito.when(mockProtocolData.outputFormat()).thenReturn(OutputFormat.TEXT);
+
+        return new StringPassResolver(mockProtocolData);
+    }
+
+    private void verifyOneRow(OneRow oneRow, byte[] expected) {
+        assertNull(oneRow.getKey());
+        byte[] bytes = (byte[]) oneRow.getData();
+        byte[] result = Arrays.copyOfRange(bytes, 0, bytes.length);
+        assertEquals(result.length, expected.length);
+        assertTrue(Arrays.equals(result, expected));
+    }
+
+//    private void verifyWarning() {
+//        Mockito.verify(Log).warn("Stream ended without line break");
+//    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
new file mode 100644
index 0000000..ebdc495
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java
@@ -0,0 +1,181 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@SuppressStaticInitializationFor("org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities")
+@PrepareForTest({HdfsUtilities.class, ReflectionUtils.class})
+public class HdfsUtilitiesTest {
+
+    Configuration conf;
+    CompressionCodecFactory factory;
+    Log Log;
+
+    @Before
+    public void SetupCompressionFactory() {
+        factory = mock(CompressionCodecFactory.class);
+        Whitebox.setInternalState(HdfsUtilities.class, factory);
+        Log = mock(Log.class);
+        Whitebox.setInternalState(HdfsUtilities.class, Log);
+    }
+
+    @Test
+    public void getCodecNoName() {
+
+        Configuration conf = new Configuration();
+        String name = "some.bad.codec";
+
+        try {
+            HdfsUtilities.getCodec(conf, name);
+            fail("function should fail with bad codec name " + name);
+        } catch (IllegalArgumentException e) {
+            assertEquals(e.getMessage(), "Compression codec " + name + " was not found.");
+        }
+    }
+
+    @Test
+    public void getCodecNoConf() {
+
+        Configuration conf = null;
+        String name = "org.apache.hadoop.io.compress.GzipCodec";
+
+        try {
+            HdfsUtilities.getCodec(conf, name);
+            fail("function should fail with when conf is null");
+        } catch (NullPointerException e) {
+            assertTrue(true);
+        }
+    }
+
+    @Test
+    public void getCodecGzip() {
+
+        Configuration conf = new Configuration();
+        String name = "org.apache.hadoop.io.compress.GzipCodec";
+
+        PowerMockito.mockStatic(ReflectionUtils.class);
+        GzipCodec gzipCodec = mock(GzipCodec.class);
+
+        when(ReflectionUtils.newInstance(GzipCodec.class, conf)).thenReturn(gzipCodec);
+
+        CompressionCodec codec = HdfsUtilities.getCodec(conf, name);
+        assertNotNull(codec);
+        assertEquals(codec, gzipCodec);
+    }
+
+    @Test
+    public void isThreadSafe() {
+
+        testIsThreadSafe(
+                "readable compression, no compression - thread safe",
+                "/some/path/without.compression",
+                null, null,
+                true);
+
+        testIsThreadSafe(
+                "readable compression, gzip compression - thread safe",
+                "/some/compressed/path.gz",
+                null, new GzipCodec(),
+                true);
+
+        testIsThreadSafe(
+                "readable compression, bzip2 compression - not thread safe",
+                "/some/path/with/bzip2.bz2",
+                null, new BZip2Codec(),
+                false);
+
+        testIsThreadSafe(
+                "writable compression, no compression codec - thread safe",
+                "/some/path",
+                null, null,
+                true);
+
+        testIsThreadSafe(
+                "writable compression, some compression codec - thread safe",
+                "/some/path",
+                "I.am.a.nice.codec", new NotSoNiceCodec(),
+                true);
+
+        testIsThreadSafe(
+                "writable compression, compression codec bzip2 - not thread safe",
+                "/some/path",
+                "org.apache.hadoop.io.compress.BZip2Codec", new BZip2Codec(),
+                false);
+    }
+
+    private void testIsThreadSafe(String testDescription, String path, String codecStr, CompressionCodec codec, boolean expectedResult) {
+        prepareDataForIsThreadSafe(path, codecStr, codec);
+
+        boolean result = HdfsUtilities.isThreadSafe(path, codecStr);
+        assertTrue(testDescription, result == expectedResult);
+    }
+
+    private void prepareDataForIsThreadSafe(String dataDir, String codecStr, CompressionCodec codec) {
+        try {
+            conf = PowerMockito.mock(Configuration.class);
+            PowerMockito.whenNew(Configuration.class).withNoArguments().thenReturn(conf);
+        } catch (Exception e) {
+            fail("new Configuration mocking failed");
+        }
+
+        if (codecStr == null) {
+            when(factory.getCodec(new Path(dataDir))).thenReturn(codec);
+        } else {
+            PowerMockito.stub(PowerMockito.method(HdfsUtilities.class, "getCodecClass")).toReturn(codec.getClass());
+        }
+    }
+
+    @Test
+    public void isSplittableCodec() {
+
+        testIsSplittableCodec("no codec - splittable",
+                "some/innocent.file", null, true);
+        testIsSplittableCodec("gzip codec - not splittable",
+                "/gzip.gz", new GzipCodec(), false);
+        testIsSplittableCodec("default codec - not splittable",
+                "/default.deflate", new DefaultCodec(), false);
+        testIsSplittableCodec("bzip2 codec - splittable",
+                "bzip2.bz2", new BZip2Codec(), true);
+    }
+
+    private void testIsSplittableCodec(String description,
+                                       String pathName, CompressionCodec codec, boolean expected) {
+        Path path = new Path(pathName);
+        when(factory.getCodec(path)).thenReturn(codec);
+
+        boolean result = HdfsUtilities.isSplittableCodec(path);
+        assertEquals(description, result, expected);
+    }
+
+    @Test
+    public void testToString() {
+        List<OneField> oneFields = Arrays.asList(new OneField(1, "uno"), new OneField(2, "dos"), new OneField(3, "tres"));
+
+        assertEquals("uno!dos!tres", HdfsUtilities.toString(oneFields, "!"));
+
+        assertEquals("uno", HdfsUtilities.toString(Collections.singletonList(oneFields.get(0)), "!"));
+
+        assertEquals("", HdfsUtilities.toString(Collections.<OneField>emptyList(), "!"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
new file mode 100644
index 0000000..671f5d8
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/NotSoNiceCodec.java
@@ -0,0 +1,64 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.hadoop.io.compress.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Codec class for UtilitiesTest
+ * Can't be embedded inside UtilitiesTest due to junit limitation.
+ */
+public class NotSoNiceCodec implements CompressionCodec {
+
+    @Override
+    public CompressionOutputStream createOutputStream(OutputStream out)
+            throws IOException {
+        return null;
+    }
+
+    @Override
+    public CompressionOutputStream createOutputStream(OutputStream out,
+                                                      Compressor compressor) throws IOException {
+        return null;
+    }
+
+    @Override
+    public Class<? extends Compressor> getCompressorType() {
+        return null;
+    }
+
+    @Override
+    public Compressor createCompressor() {
+        return null;
+    }
+
+    @Override
+    public CompressionInputStream createInputStream(InputStream in)
+            throws IOException {
+        return null;
+    }
+
+    @Override
+    public CompressionInputStream createInputStream(InputStream in,
+                                                    Decompressor decompressor) throws IOException {
+        return null;
+    }
+
+    @Override
+    public Class<? extends Decompressor> getDecompressorType() {
+        return null;
+    }
+
+    @Override
+    public Decompressor createDecompressor() {
+        return null;
+    }
+
+    @Override
+    public String getDefaultExtension() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
new file mode 100644
index 0000000..1f68b09
--- /dev/null
+++ b/pxf/pxf-hdfs/src/test/java/org/apache/hawq/pxf/plugins/hdfs/utilities/RecordkeyAdapterTest.java
@@ -0,0 +1,154 @@
+package org.apache.hawq.pxf.plugins.hdfs.utilities;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.*;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({RecordkeyAdapter.class, LogFactory.class})
+public class RecordkeyAdapterTest {
+    Log Log;
+    RecordkeyAdapter recordkeyAdapter;
+
+    /**
+     * Test convertKeyValue for Integer type
+     */
+    @Test
+    public void convertKeyValueInteger() {
+        int key = 13;
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new IntWritable(key));
+    }
+
+    /**
+     * Test convertKeyValue for Boolean type
+     */
+    @Test
+    public void convertKeyValueBoolean() {
+        boolean key = true;
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new BooleanWritable(key));
+    }
+
+    /**
+     * Test convertKeyValue for Byte type
+     */
+    @Test
+    public void convertKeyValueByte() {
+        byte key = 1;
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new ByteWritable(key));
+    }
+
+    /**
+     * Test convertKeyValue for Double type
+     */
+    @Test
+    public void convertKeyValueDouble() {
+        double key = 2.3;
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new DoubleWritable(key));
+    }
+
+    /**
+     * Test convertKeyValue for Float type
+     */
+    @Test
+    public void convertKeyValueFloat() {
+        float key = (float) 2.3;
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new FloatWritable(key));
+    }
+
+    /**
+     * Test convertKeyValue for Long type
+     */
+    @Test
+    public void convertKeyValueLong() {
+        long key = 12345678901234567l;
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new LongWritable(key));
+    }
+
+    /**
+     * Test convertKeyValue for String type
+     */
+    @Test
+    public void convertKeyValueString() {
+        String key = "key";
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new Text(key));
+    }
+
+    /**
+     * Test convertKeyValue for several calls of the same type
+     */
+    @Test
+    public void convertKeyValueManyCalls() {
+        Boolean key = true;
+        mockLog();
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new BooleanWritable(key));
+        verifyLog("converter initialized for type " + key.getClass() +
+                " (key value: " + key + ")");
+
+        for (int i = 0; i < 5; ++i) {
+            key = (i % 2) == 0;
+            runConvertKeyValue(key, new BooleanWritable(key));
+        }
+        verifyLogOnlyOnce();
+    }
+
+    /**
+     * Test convertKeyValue for boolean type and then string type - negative
+     * test
+     */
+    @Test
+    public void convertKeyValueBadSecondValue() {
+        boolean key = true;
+        initRecordkeyAdapter();
+        runConvertKeyValue(key, new BooleanWritable(key));
+        String badKey = "bad";
+        try {
+            recordkeyAdapter.convertKeyValue(badKey);
+            fail("conversion of string to boolean should fail");
+        } catch (ClassCastException e) {
+            assertEquals(e.getMessage(),
+                    "java.lang.String cannot be cast to java.lang.Boolean");
+        }
+    }
+
+    private void initRecordkeyAdapter() {
+        recordkeyAdapter = new RecordkeyAdapter();
+    }
+
+    private void runConvertKeyValue(Object key, Writable expected) {
+        Writable writable = recordkeyAdapter.convertKeyValue(key);
+        assertEquals(writable, expected);
+    }
+
+    private void mockLog() {
+        PowerMockito.mockStatic(LogFactory.class);
+        Log = mock(Log.class);
+        when(LogFactory.getLog(RecordkeyAdapter.class)).thenReturn(Log);
+    }
+
+    private void verifyLog(String msg) {
+        Mockito.verify(Log).debug(msg);
+    }
+
+    private void verifyLogOnlyOnce() {
+        Mockito.verify(Log, Mockito.times(1)).debug(Mockito.any());
+    }
+}


Mime
View raw message