hawq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From odiache...@apache.org
Subject [2/2] incubator-hawq git commit: HAWQ-178. Add JSON plugin support in code base.
Date Wed, 18 May 2016 23:59:34 GMT
HAWQ-178. Add JSON plugin support in code base.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/fd9c3686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/fd9c3686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/fd9c3686

Branch: refs/heads/master
Commit: fd9c36861506ac94255a34c6a85307bf87ae0f72
Parents: c0d7c4f
Author: Oleksandr Diachenko <odiachenko@pivotal.io>
Authored: Wed May 18 16:58:07 2016 -0700
Committer: Oleksandr Diachenko <odiachenko@pivotal.io>
Committed: Wed May 18 16:58:07 2016 -0700

----------------------------------------------------------------------
 pxf/build.gradle                                |  34 +-
 pxf/pxf-json/.gitignore                         |   1 +
 .../pxf/plugins/json/ColumnDescriptorCache.java | 119 ++++
 .../hawq/pxf/plugins/json/JsonAccessor.java     |  84 +++
 .../hawq/pxf/plugins/json/JsonRecordReader.java | 176 +++++
 .../hawq/pxf/plugins/json/JsonResolver.java     | 256 +++++++
 .../hawq/pxf/plugins/json/parser/JsonLexer.java | 175 +++++
 .../json/parser/PartitionedJsonParser.java      | 200 ++++++
 .../pxf/plugins/json/JsonExtensionTest.java     | 272 ++++++++
 .../apache/hawq/pxf/plugins/json/PxfUnit.java   | 666 +++++++++++++++++++
 .../pxf/plugins/json/parser/JsonLexerTest.java  | 141 ++++
 .../parser/PartitionedJsonParserNoSeekTest.java |  83 +++
 .../parser/PartitionedJsonParserOffsetTest.java |  55 ++
 .../parser/PartitionedJsonParserSeekTest.java   | 113 ++++
 .../src/test/resources/datatypes-test.json      |  14 +
 .../lexer-tests/array_objects_complex.json      |  24 +
 .../array_objects_complex.json.state            | 149 +++++
 .../lexer-tests/array_objects_empty.json        |   1 +
 .../lexer-tests/array_objects_empty.json.state  |   7 +
 .../resources/lexer-tests/array_of_numbers.json |   1 +
 .../lexer-tests/array_of_numbers.json.state     |   5 +
 .../resources/lexer-tests/object_complex.json   |  22 +
 .../lexer-tests/object_complex.json.state       | 146 ++++
 .../resources/lexer-tests/object_simple.json    |   4 +
 .../lexer-tests/object_simple.json.state        |  24 +
 .../lexer-tests/object_string_escaping.json     |   4 +
 .../object_string_escaping.json.state           |  44 ++
 .../src/test/resources/log4j.properties         |  22 +
 .../src/test/resources/null-tweets.json         |   2 +
 .../noseek/array_objects_complex.json           |  24 +
 .../array_objects_complex.json.expected1.json   |  22 +
 .../noseek/array_objects_complex2.json          |  24 +
 .../array_objects_complex2.json.expected1.json  |   4 +
 .../array_objects_complex2.json.expected2.json  |   4 +
 .../array_objects_same_name_in_child.json       |  24 +
 ...jects_same_name_in_child.json.expected1.json |  22 +
 .../parser-tests/noseek/object_simple.json      |   4 +
 .../noseek/object_simple.json.expected1.json    |   4 +
 .../noseek/object_string_escaping.json          |   4 +
 .../object_string_escaping.json.expected1.json  |   4 +
 .../child-object-before-member/expected.1.json  |  44 ++
 .../seek/child-object-before-member/input.json  |  59 ++
 .../child-object-before-member2/expected.1.json |  44 ++
 .../seek/child-object-before-member2/input.json |  59 ++
 .../parser-tests/seek/multi/expected.1.json     |  22 +
 .../parser-tests/seek/multi/expected.2.json     |  25 +
 .../parser-tests/seek/multi/input.json          |  48 ++
 .../parser-tests/seek/no-elements/input.json    |  24 +
 .../seek/seek-into-mid-object-1/expected.1.json |  22 +
 .../seek/seek-into-mid-object-1/input.json      |  28 +
 .../seek/seek-into-mid-object-2/expected.1.json |  22 +
 .../seek/seek-into-mid-object-2/input.json      |  28 +
 .../seek/seek-into-mid-object-3/expected.1.json |  22 +
 .../seek/seek-into-mid-object-3/input.json      |  28 +
 .../seek/seek-into-string-1/expected.1.json     |  22 +
 .../seek/seek-into-string-1/input.json          |  27 +
 .../seek/seek-into-string-2/expected.1.json     |  22 +
 .../seek/seek-into-string-2/input.json          |  27 +
 .../parser-tests/seek/simple/expected.1.json    |  22 +
 .../parser-tests/seek/simple/input.json         |  24 +
 .../src/test/resources/sample-malformed.json    |   3 +
 pxf/pxf-json/src/test/resources/sample.json     |   3 +
 .../src/test/resources/tweets-broken.json       |  67 ++
 .../test/resources/tweets-pp-with-delete.json   |  79 +++
 pxf/pxf-json/src/test/resources/tweets-pp.json  |  67 ++
 .../resources/tweets-small-with-delete.json     |   4 +
 .../src/test/resources/tweets-small.json        |   4 +
 .../tweets-with-missing-text-attribtute.json    |  18 +
 pxf/pxf-json/src/test/resources/tweets.tar.gz   | Bin 0 -> 796 bytes
 .../test/resources/variable-size-objects.json   |   3 +
 .../src/main/resources/pxf-privatehdp.classpath |   1 +
 .../src/main/resources/pxf-privatephd.classpath |   1 +
 .../src/main/resources/pxf-profiles-default.xml |  14 +
 pxf/settings.gradle                             |   1 +
 74 files changed, 3861 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/build.gradle
----------------------------------------------------------------------
diff --git a/pxf/build.gradle b/pxf/build.gradle
index 3f3d31c..eb716fc 100644
--- a/pxf/build.gradle
+++ b/pxf/build.gradle
@@ -33,7 +33,7 @@ buildscript {
             url 'http://repository.jboss.org/nexus/content/groups/public'
         }
     }
-    
+
     dependencies {
         classpath "com.netflix.nebula:gradle-ospackage-plugin:2.2.6"
         classpath "de.undercouch:gradle-download-task:2.1.0"
@@ -87,7 +87,7 @@ subprojects { subProject ->
         testCompile 'org.powermock:powermock-api-mockito:1.5.1'
         testCompile 'org.mockito:mockito-core:1.9.5'
     }
-    
+
     configurations.all {
         resolutionStrategy {
             // force versions that were specified in dependencies:
@@ -337,6 +337,26 @@ project('pxf-hive') {
     }
 }
 
+project('pxf-json') {
+    dependencies {
+      compile(project(':pxf-hdfs'))
+      compile(project(':pxf-service'))
+      compile "org.apache.commons:commons-lang3:3.0"
+
+      testCompile 'pl.pragmatists:JUnitParams:1.0.2'
+    }
+
+    ospackage {
+      requires('pxf-hdfs', project.version, GREATER | EQUAL)
+
+      from(jar.outputs.files) {
+        into "/usr/lib/pxf-${project.version}"
+      }
+
+      link("/usr/lib/pxf-${project.version}/${project.name}.jar", "${project.name}-${project.version}.jar")
+    }
+}
+
 project('pxf-hbase') {
     dependencies {
         compile(project(':pxf-api'))
@@ -404,15 +424,15 @@ task rpm(type: Copy, dependsOn: [subprojects.build, distSubprojects.buildRpm]) {
 def tomcatName = "apache-tomcat-${tomcatVersion}"
 def tomcatTargetDir = "tomcat/build/"
 
-    
+
 task tomcatGet << {
-    
+
     apply plugin: 'de.undercouch.download'
     
     def TarGzSuffix = ".tar.gz"
     def tomcatTar = "${tomcatName}${TarGzSuffix}"
     def tomcatUrl = "http://archive.apache.org/dist/tomcat/tomcat-7/v${tomcatVersion}/bin/${tomcatTar}"
-	    
+
     if (file("${tomcatTargetDir}/${tomcatName}").exists()) {
         println "${tomcatName} already exists, nothing to do"
         return 0
@@ -436,13 +456,13 @@ apply plugin: 'os-package'
 
 task tomcatRpm(type: Rpm) {
     buildDir = 'tomcat/build/'
-     
+
     // clean should not delete the downloaded tarball
     // and RPM, so this is a bogus directory to delete instead.
     clean {
         delete = 'tomcat/build/something'
     }
-     
+
     ospackage {
         packageName 'apache-tomcat'
         summary = 'Apache Tomcat RPM'

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/.gitignore
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/.gitignore b/pxf/pxf-json/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/pxf/pxf-json/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java
new file mode 100644
index 0000000..01cd37c
--- /dev/null
+++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java
@@ -0,0 +1,119 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+
+/**
+ * Helper class used to retrieve all column details relevant for the json processing.
+ */
+public class ColumnDescriptorCache {
+
+	private static Pattern ARRAY_PROJECTION_PATTERN = Pattern.compile("(.+)\\[([0-9]+)\\]");
+	private static int ARRAY_NAME_GROUPID = 1;
+	private static int ARRAY_INDEX_GROUPID = 2;
+
+	private final DataType columnType;
+	private final String[] normalizedProjection;
+	private final String arrayNodeName;
+	private final int arrayNodeIndex;
+	private final boolean isArray;
+	private String columnName;
+
+	public ColumnDescriptorCache(ColumnDescriptor columnDescriptor) {
+
+		// HAWQ column type
+		this.columnType = DataType.get(columnDescriptor.columnTypeCode());
+
+		this.columnName = columnDescriptor.columnName();
+
+		// Column name can use dot-name convention to specify a nested json node.
+		// Break the path into array of path steps called projections
+		String[] projection = columnDescriptor.columnName().split("\\.");
+
+		// When the projection contains array reference (e.g. projections = foo.bar[66]) then replace the last path
+		// element by the array name (e.g. normalizedProjection = foo.bar)
+		normalizedProjection = new String[projection.length];
+
+		// Check if the provided json path (projections) refers to an array element.
+		Matcher matcher = ARRAY_PROJECTION_PATTERN.matcher(projection[projection.length - 1]);
+		if (matcher.matches()) {
+			this.isArray = true;
+			// extracts the array node name from the projection path
+			this.arrayNodeName = matcher.group(ARRAY_NAME_GROUPID);
+			// extracts the array index from the projection path
+			this.arrayNodeIndex = Integer.parseInt(matcher.group(ARRAY_INDEX_GROUPID));
+
+			System.arraycopy(projection, 0, normalizedProjection, 0, projection.length - 1);
+			normalizedProjection[projection.length - 1] = this.arrayNodeName;
+		} else {
+			this.isArray = false;
+			this.arrayNodeName = null;
+			this.arrayNodeIndex = -1;
+
+			System.arraycopy(projection, 0, normalizedProjection, 0, projection.length);
+		}
+	}
+
+	/**
+	 * @return Column's type
+	 */
+	public DataType getColumnType() {
+		return columnType;
+	}
+
+	/**
+	 * @return Returns the column name as defined in the HAWQ table.
+	 */
+	public String getColumnName() {
+		return columnName;
+	}
+
+	/**
+	 * If the column name contains dots (.) then this name is interpreted as path into the target json document pointing
+	 * to nested json member. The leftmost path element stands for the root in the json document.
+	 * 
+	 * @return If the column name contains dots (.) list of field names that represent the path from the root json node
+	 *         to the target nested node.
+	 */
+	public String[] getNormalizedProjections() {
+		return normalizedProjection;
+	}
+
+	/**
+	 * The 'jsonName[index]' column name conventions is used to point to a particular json array element.
+	 * 
+	 * @return Returns the json index of the referred array element.
+	 */
+	public int getArrayNodeIndex() {
+		return arrayNodeIndex;
+	}
+
+	/**
+	 * @return Returns true if the column name is a path to json array element and false otherwise.
+	 */
+	public boolean isArray() {
+		return isArray;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java
new file mode 100644
index 0000000..8006273
--- /dev/null
+++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java
@@ -0,0 +1,84 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
+
+/**
+ * This JSON accessor for PXF will read JSON data and pass it to a {@link JsonResolver}.
+ * 
+ * This accessor supports a single JSON record per line, or a multi-line JSON records if the <b>IDENTIFIER</b> parameter
+ * is set.
+ * 
+ * When provided the <b>IDENTIFIER</b> indicates the member name used to determine the encapsulating json object to
+ * return.
+ */
+public class JsonAccessor extends HdfsSplittableDataAccessor {
+
+	public static final String IDENTIFIER_PARAM = "IDENTIFIER";
+	public static final String RECORD_MAX_LENGTH_PARAM = "MAXLENGTH";
+
+	/**
+	 * If provided indicates the member name which will be used to determine the encapsulating json object to return.
+	 */
+	private String identifier = "";
+
+	/**
+	 * Optional parameter that allows to define the max length of a json record. Records that exceed the allowed length
+	 * are skipped. This parameter is applied only for the multi-line json records (e.g. when the IDENTIFIER is
+	 * provided).
+	 */
+	private int maxRecordLength = Integer.MAX_VALUE;
+
+	public JsonAccessor(InputData inputData) throws Exception {
+		// Because HdfsSplittableDataAccessor doesn't use the InputFormat we set it to null.
+		super(inputData, null);
+
+		if (!isEmpty(inputData.getUserProperty(IDENTIFIER_PARAM))) {
+
+			identifier = inputData.getUserProperty(IDENTIFIER_PARAM);
+
+			// If the member identifier is set then check if a record max length is defined as well.
+			if (!isEmpty(inputData.getUserProperty(RECORD_MAX_LENGTH_PARAM))) {
+				maxRecordLength = Integer.valueOf(inputData.getUserProperty(RECORD_MAX_LENGTH_PARAM));
+			}
+		}
+	}
+
+	@Override
+	protected Object getReader(JobConf conf, InputSplit split) throws IOException {
+		if (!isEmpty(identifier)) {
+			conf.set(JsonRecordReader.RECORD_MEMBER_IDENTIFIER, identifier);
+			conf.setInt(JsonRecordReader.RECORD_MAX_LENGTH, maxRecordLength);
+			return new JsonRecordReader(conf, (FileSplit) split);
+		} else {
+			return new LineRecordReader(conf, (FileSplit) split);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
new file mode 100644
index 0000000..26d4c82
--- /dev/null
+++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
@@ -0,0 +1,176 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hawq.pxf.plugins.json.parser.PartitionedJsonParser;
+
+/**
+ * Multi-line json object reader. JsonRecordReader uses a member name (set by the <b>IDENTIFIER</b> PXF parameter) to
+ * determine the encapsulating object to extract and read.
+ * 
+ * JsonRecordReader supports compressed input files as well.
+ * 
+ * As a safe guard set the optional <b>MAXLENGTH</b> parameter to limit the max size of a record.
+ */
+public class JsonRecordReader implements RecordReader<LongWritable, Text> {
+
+	private static final Log LOG = LogFactory.getLog(JsonRecordReader.class);
+
+	public static final String RECORD_MEMBER_IDENTIFIER = "json.input.format.record.identifier";
+	public static final String RECORD_MAX_LENGTH = "multilinejsonrecordreader.maxlength";
+
+	private CompressionCodecFactory compressionCodecs = null;
+	private long start;
+	private long pos;
+	private long end;
+	private int maxObjectLength;
+	private InputStream is;
+	private PartitionedJsonParser parser;
+	private final String jsonMemberName;
+
+	/**
+	 * Create new multi-line json object reader.
+	 * 
+	 * @param conf
+	 *            Hadoop context
+	 * @param split
+	 *            HDFS split to start the reading from
+	 * @throws IOException
+	 */
+	public JsonRecordReader(JobConf conf, FileSplit split) throws IOException {
+
+		this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER);
+		this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, Integer.MAX_VALUE);
+
+		start = split.getStart();
+		end = start + split.getLength();
+		final Path file = split.getPath();
+		compressionCodecs = new CompressionCodecFactory(conf);
+		final CompressionCodec codec = compressionCodecs.getCodec(file);
+
+		// open the file and seek to the start of the split
+		FileSystem fs = file.getFileSystem(conf);
+		FSDataInputStream fileIn = fs.open(split.getPath());
+		if (codec != null) {
+			is = codec.createInputStream(fileIn);
+			start = 0;
+			end = Long.MAX_VALUE;
+		} else {
+			if (start != 0) {
+				fileIn.seek(start);
+			}
+			is = fileIn;
+		}
+		parser = new PartitionedJsonParser(is);
+		this.pos = start;
+	}
+
+	/*
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean next(LongWritable key, Text value) throws IOException {
+
+		while (pos < end) {
+
+			String json = parser.nextObjectContainingMember(jsonMemberName);
+			pos = start + parser.getBytesRead();
+
+			if (json == null) {
+				return false;
+			}
+
+			long jsonStart = pos - json.length();
+
+			// if the "begin-object" position is after the end of our split, we should ignore it
+			if (jsonStart >= end) {
+				return false;
+			}
+
+			if (json.length() > maxObjectLength) {
+				LOG.warn("Skipped JSON object of size " + json.length() + " at pos " + jsonStart);
+			} else {
+				key.set(jsonStart);
+				value.set(json);
+				return true;
+			}
+		}
+
+		return false;
+	}
+
+	/*
+	 * {@inheritDoc}
+	 */
+	@Override
+	public LongWritable createKey() {
+		return new LongWritable();
+	}
+
+	/*
+	 * {@inheritDoc}
+	 */
+	@Override
+	public Text createValue() {
+		return new Text();
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return pos;
+	}
+
+	/*
+	 * {@inheritDoc}
+	 */
+	@Override
+	public synchronized void close() throws IOException {
+		if (is != null) {
+			is.close();
+		}
+	}
+
+	/*
+	 * {@inheritDoc}
+	 */
+	@Override
+	public float getProgress() throws IOException {
+		if (start == end) {
+			return 0.0f;
+		} else {
+			return Math.min(1.0f, (pos - start) / (float) (end - start));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java
new file mode 100644
index 0000000..21db6b7
--- /dev/null
+++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java
@@ -0,0 +1,256 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * This JSON resolver for PXF will decode a given object from the {@link JsonAccessor} into a row for HAWQ. It will
+ * decode this data into a JsonNode and walk the tree for each column. It supports normal value mapping via projections
+ * and JSON array indexing.
+ */
+public class JsonResolver extends Plugin implements ReadResolver {
+
+	private static final Log LOG = LogFactory.getLog(JsonResolver.class);
+
+	private ArrayList<OneField> oneFieldList;
+	private ColumnDescriptorCache[] columnDescriptorCache;
+	private ObjectMapper mapper;
+
+	/**
+	 * Row with empty fields. Returned in case of broken or malformed json records.
+	 */
+	private final List<OneField> emptyRow;
+
+	public JsonResolver(InputData inputData) throws Exception {
+		super(inputData);
+		oneFieldList = new ArrayList<OneField>();
+		mapper = new ObjectMapper(new JsonFactory());
+
+		// Precompute the column metadata. The metadata is used for mapping column names to json nodes.
+		columnDescriptorCache = new ColumnDescriptorCache[inputData.getColumns()];
+		for (int i = 0; i < inputData.getColumns(); ++i) {
+			ColumnDescriptor cd = inputData.getColumn(i);
+			columnDescriptorCache[i] = new ColumnDescriptorCache(cd);
+		}
+
+		emptyRow = createEmptyRow();
+	}
+
+	@Override
+	public List<OneField> getFields(OneRow row) throws Exception {
+		oneFieldList.clear();
+
+		String jsonRecordAsText = row.getData().toString();
+
+		JsonNode root = decodeLineToJsonNode(jsonRecordAsText);
+
+		if (root == null) {
+			LOG.warn("Return empty-fields row due to invalid JSON: " + jsonRecordAsText);
+			return emptyRow;
+		}
+
+		// Iterate through the column definition and fetch our JSON data
+		for (ColumnDescriptorCache columnMetadata : columnDescriptorCache) {
+
+			JsonNode node = getChildJsonNode(root, columnMetadata.getNormalizedProjections());
+
+			// If this node is null or missing, add a null value here
+			if (node == null || node.isMissingNode()) {
+				addNullField(columnMetadata.getColumnType());
+			} else if (columnMetadata.isArray()) {
+				// If this column is an array index, ex. "tweet.hashtags[0]"
+				if (node.isArray()) {
+					// If the JSON node is an array, then add it to our list
+					addFieldFromJsonArray(columnMetadata.getColumnType(), node, columnMetadata.getArrayNodeIndex());
+				} else {
+					throw new IllegalStateException(columnMetadata.getColumnName() + " is not an array node");
+				}
+			} else {
+				// This column is not an array type
+				// Add the value to the record
+				addFieldFromJsonNode(columnMetadata.getColumnType(), node);
+			}
+		}
+
+		return oneFieldList;
+	}
+
+	/**
+	 * @return Returns a row comprised of typed, empty fields. Used as a result of broken/malformed json records.
+	 */
+	private List<OneField> createEmptyRow() {
+		ArrayList<OneField> emptyFieldList = new ArrayList<OneField>();
+		for (ColumnDescriptorCache column : columnDescriptorCache) {
+			emptyFieldList.add(new OneField(column.getColumnType().getOID(), null));
+		}
+		return emptyFieldList;
+	}
+
+	/**
+	 * Iterates down the root node to the child JSON node defined by the projs path.
+	 * 
+	 * @param root
+	 *            node to to start the traversal from.
+	 * @param projs
+	 *            defines the path from the root to the desired child node.
+	 * @return Returns the child node defined by the root and projs path.
+	 */
+	private JsonNode getChildJsonNode(JsonNode root, String[] projs) {
+
+		// Iterate through all the tokens to the desired JSON node
+		JsonNode node = root;
+		for (int j = 0; j < projs.length; ++j) {
+			node = node.path(projs[j]);
+		}
+
+		return node;
+	}
+
+	/**
+	 * Iterates through the given JSON node to the proper index and adds the field of corresponding type
+	 * 
+	 * @param type
+	 *            The {@link DataType} type
+	 * @param node
+	 *            The JSON array node
+	 * @param index
+	 *            The array index to iterate to
+	 * @throws IOException
+	 */
+	private void addFieldFromJsonArray(DataType type, JsonNode node, int index) throws IOException {
+
+		int count = 0;
+		boolean added = false;
+		for (Iterator<JsonNode> arrayNodes = node.getElements(); arrayNodes.hasNext();) {
+			JsonNode arrayNode = arrayNodes.next();
+
+			if (count == index) {
+				added = true;
+				addFieldFromJsonNode(type, arrayNode);
+				break;
+			}
+
+			++count;
+		}
+
+		// if we reached the end of the array without adding a field, add null
+		if (!added) {
+			addNullField(type);
+		}
+	}
+
+	/**
+	 * Adds a field from a given JSON node value based on the {@link DataType} type.
+	 * 
+	 * @param type
+	 *            The DataType type
+	 * @param val
+	 *            The JSON node to extract the value.
+	 * @throws IOException
+	 */
+	private void addFieldFromJsonNode(DataType type, JsonNode val) throws IOException {
+		OneField oneField = new OneField();
+		oneField.type = type.getOID();
+
+		if (val.isNull()) {
+			oneField.val = null;
+		} else {
+			switch (type) {
+			case BIGINT:
+				oneField.val = val.asLong();
+				break;
+			case BOOLEAN:
+				oneField.val = val.asBoolean();
+				break;
+			case CHAR:
+				oneField.val = val.asText().charAt(0);
+				break;
+			case BYTEA:
+				oneField.val = val.asText().getBytes();
+				break;
+			case FLOAT8:
+				oneField.val = val.asDouble();
+				break;
+			case REAL:
+				oneField.val = (float)val.asDouble();
+				break;
+			case INTEGER:
+				oneField.val = val.asInt();
+				break;
+			case SMALLINT:
+				oneField.val = (short)val.asInt();
+				break;
+			case BPCHAR:
+			case TEXT:
+			case VARCHAR:
+				oneField.val = val.asText();
+				break;
+			default:
+				throw new IOException("Unsupported type " + type);
+			}
+		}
+
+		oneFieldList.add(oneField);
+	}
+
+	/**
+	 * Adds a null field of the given type.
+	 * 
+	 * @param type
+	 *            The {@link DataType} type
+	 */
+	private void addNullField(DataType type) {
+		oneFieldList.add(new OneField(type.getOID(), null));
+	}
+
+	/**
+	 * Converts the input line parameter into {@link JsonNode} instance.
+	 * 
+	 * @param line
+	 *            JSON text
+	 * @return Returns a {@link JsonNode} that represents the input line or null for invalid json.
+	 */
+	private JsonNode decodeLineToJsonNode(String line) {
+
+		try {
+			return mapper.readTree(line);
+		} catch (Exception e) {
+			LOG.error("Failed to parse JSON object", e);
+			return null;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java
new file mode 100644
index 0000000..616312d
--- /dev/null
+++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java
@@ -0,0 +1,175 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A very loosey-goosey lexer that doesn't enforce any JSON structural rules
+ */
+public class JsonLexer {
+
+	/**
+	 * The current Lexer state.
+	 */
+	private JsonLexerState state;
+
+	/**
+	 * Create a new lexer with {@link JsonLexerState#NULL} initial state
+	 */
+	public JsonLexer() {
+		this(JsonLexerState.NULL);
+	}
+
+	/**
+	 * Create a new lexer with initial state
+	 * 
+	 * @param initState
+	 *            Lexer initial state
+	 */
+	public JsonLexer(JsonLexerState initState) {
+		state = initState;
+	}
+
+	/**
+	 * @return current lexer state
+	 */
+	public JsonLexerState getState() {
+		return state;
+	}
+
+	/**
+	 * Change Lexer's {@link #state}
+	 * 
+	 * @param state
+	 *            New lexer state
+	 */
+	public void setState(JsonLexerState state) {
+		this.state = state;
+	}
+
+	/**
+	 * Represents the possible states of a cursor can take in a JSON document.
+	 */
+	public static enum JsonLexerState {
+		NULL,
+
+		DONT_CARE,
+
+		BEGIN_OBJECT,
+
+		END_OBJECT,
+
+		BEGIN_STRING,
+
+		END_STRING,
+
+		INSIDE_STRING,
+
+		STRING_ESCAPE,
+
+		VALUE_SEPARATOR,
+
+		NAME_SEPARATOR,
+
+		BEGIN_ARRAY,
+
+		END_ARRAY,
+
+		WHITESPACE
+	}
+
+	/**
+	 * Given the current lexer state and the next cursor position computes the next {@link #state}.
+	 * 
+	 * @param c
+	 *            next character the cursor is moved to
+	 */
+	public void lex(char c) {
+		switch (state) {
+		case NULL:
+		case BEGIN_OBJECT:
+		case END_OBJECT:
+		case BEGIN_ARRAY:
+		case END_ARRAY:
+		case END_STRING:
+		case VALUE_SEPARATOR:
+		case NAME_SEPARATOR:
+		case DONT_CARE:
+		case WHITESPACE: {
+			if (Character.isWhitespace(c)) {
+				state = JsonLexerState.WHITESPACE;
+				break;
+			}
+			switch (c) {
+			// value-separator (comma)
+			case ',':
+				state = JsonLexerState.VALUE_SEPARATOR;
+				break;
+			// name-separator (colon)
+			case ':':
+				state = JsonLexerState.NAME_SEPARATOR;
+				break;
+			// string
+			case '"':
+				state = JsonLexerState.BEGIN_STRING;
+				break;
+			// start-object
+			case '{':
+				state = JsonLexerState.BEGIN_OBJECT;
+				break;
+			// end-object
+			case '}':
+				state = JsonLexerState.END_OBJECT;
+				break;
+			// begin-array
+			case '[':
+				state = JsonLexerState.BEGIN_ARRAY;
+				break;
+			// end-array
+			case ']':
+				state = JsonLexerState.END_ARRAY;
+				break;
+			default:
+				state = JsonLexerState.DONT_CARE;
+			}
+			break;
+		}
+		case BEGIN_STRING:
+		case INSIDE_STRING: {
+			state = JsonLexerState.INSIDE_STRING;
+			// we will now enter the STRING state below
+
+			switch (c) {
+			// end-string
+			case '"':
+				state = JsonLexerState.END_STRING;
+				break;
+			// escape
+			case '\\':
+				state = JsonLexerState.STRING_ESCAPE;
+			}
+			break;
+		}
+		case STRING_ESCAPE: {
+			state = JsonLexerState.INSIDE_STRING;
+			break;
+		}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java
new file mode 100644
index 0000000..71ad449
--- /dev/null
+++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java
@@ -0,0 +1,200 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hawq.pxf.plugins.json.parser.JsonLexer.JsonLexerState;
+
+/**
+ * A simple parser that can support reading JSON objects from a random point in JSON text. It reads from the supplied
+ * stream (which is assumed to be positioned at any arbitrary position inside some JSON text) until it finds the first
+ * JSON begin-object "{". From this point on it will keep reading JSON objects until it finds one containing a member
+ * string that the user supplies.
+ * <p/>
+ * It is not recommended to use this with JSON text where individual JSON objects that can be large (MB's or larger).
+ */
+public class PartitionedJsonParser {
+
+	private static final char BACKSLASH = '\\';
+	private static final char START_BRACE = '{';
+	private static final int EOF = -1;
+	private final InputStreamReader inputStreamReader;
+	private final JsonLexer lexer;
+	private long bytesRead = 0;
+	private boolean endOfStream = false;
+
+	public PartitionedJsonParser(InputStream is) {
+		this.lexer = new JsonLexer();
+
+		// You need to wrap the InputStream with an InputStreamReader, so that it can encode the incoming byte stream as
+		// UTF-8 characters
+		this.inputStreamReader = new InputStreamReader(is, StandardCharsets.UTF_8);
+	}
+
+	private boolean scanToFirstBeginObject() throws IOException {
+		// seek until we hit the first begin-object
+		char prev = ' ';
+		int i;
+		while ((i = inputStreamReader.read()) != EOF) {
+			char c = (char) i;
+			bytesRead++;
+			if (c == START_BRACE && prev != BACKSLASH) {
+				lexer.setState(JsonLexer.JsonLexerState.BEGIN_OBJECT);
+				return true;
+			}
+			prev = c;
+		}
+		endOfStream = true;
+		return false;
+	}
+
+	private enum MemberSearchState {
+		FOUND_STRING_NAME,
+
+		SEARCHING,
+
+		IN_MATCHING_OBJECT
+	}
+
+	private static final EnumSet<JsonLexerState> inStringStates = EnumSet.of(JsonLexerState.INSIDE_STRING,
+			JsonLexerState.STRING_ESCAPE);
+
+	/**
+	 * @param memberName
+	 *            Indicates the member name used to determine the encapsulating object to return.
+	 * @return Returns next json object that contains a member attribute with name: memberName. Returns null if no such
+	 *         object is found or the end of the stream is reached.
+	 * @throws IOException
+	 */
+	public String nextObjectContainingMember(String memberName) throws IOException {
+
+		if (endOfStream) {
+			return null;
+		}
+
+		int i;
+		int objectCount = 0;
+		StringBuilder currentObject = new StringBuilder();
+		StringBuilder currentString = new StringBuilder();
+		MemberSearchState memberState = MemberSearchState.SEARCHING;
+
+		List<Integer> objectStack = new ArrayList<Integer>();
+
+		if (!scanToFirstBeginObject()) {
+			return null;
+		}
+		currentObject.append(START_BRACE);
+		objectStack.add(0);
+
+		while ((i = inputStreamReader.read()) != EOF) {
+			char c = (char) i;
+			bytesRead++;
+
+			lexer.lex(c);
+
+			currentObject.append(c);
+
+			switch (memberState) {
+			case SEARCHING:
+				if (lexer.getState() == JsonLexerState.BEGIN_STRING) {
+					// we found the start of a string, so reset our string buffer
+					currentString.setLength(0);
+				} else if (inStringStates.contains(lexer.getState())) {
+					// we're still inside a string, so keep appending to our buffer
+					currentString.append(c);
+				} else if (lexer.getState() == JsonLexerState.END_STRING && memberName.equals(currentString.toString())) {
+
+					if (objectStack.size() > 0) {
+						// we hit the end of the string and it matched the member name (yay)
+						memberState = MemberSearchState.FOUND_STRING_NAME;
+						currentString.setLength(0);
+					}
+				} else if (lexer.getState() == JsonLexerState.BEGIN_OBJECT) {
+					// we are searching and found a '{', so we reset the current object string
+					if (objectStack.size() == 0) {
+						currentObject.setLength(0);
+						currentObject.append(START_BRACE);
+					}
+					objectStack.add(currentObject.length() - 1);
+				} else if (lexer.getState() == JsonLexerState.END_OBJECT) {
+					if (objectStack.size() > 0) {
+						objectStack.remove(objectStack.size() - 1);
+					}
+					if (objectStack.size() == 0) {
+						currentObject.setLength(0);
+					}
+				}
+				break;
+			case FOUND_STRING_NAME:
+				// keep popping whitespaces until we hit a different token
+				if (lexer.getState() != JsonLexerState.WHITESPACE) {
+					if (lexer.getState() == JsonLexerState.NAME_SEPARATOR) {
+						// found our member!
+						memberState = MemberSearchState.IN_MATCHING_OBJECT;
+						objectCount = 0;
+
+						if (objectStack.size() > 1) {
+							currentObject.delete(0, objectStack.get(objectStack.size() - 1));
+						}
+						objectStack.clear();
+					} else {
+						// we didn't find a value-separator (:), so our string wasn't a member string. keep searching
+						memberState = MemberSearchState.SEARCHING;
+					}
+				}
+				break;
+			case IN_MATCHING_OBJECT:
+				if (lexer.getState() == JsonLexerState.BEGIN_OBJECT) {
+					objectCount++;
+				} else if (lexer.getState() == JsonLexerState.END_OBJECT) {
+					objectCount--;
+					if (objectCount < 0) {
+						// we're done! we reached an "}" which is at the same level as the member we found
+						return currentObject.toString();
+					}
+				}
+				break;
+			}
+		}
+		endOfStream = true;
+		return null;
+	}
+
+	/**
+	 * @return Returns the number of bytes read from the stream.
+	 */
+	public long getBytesRead() {
+		return bytesRead;
+	}
+
+	/**
+	 * @return Returns true if the end of the stream has been reached and false otherwise.
+	 */
+	public boolean isEndOfStream() {
+		return endOfStream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java
new file mode 100644
index 0000000..a8161c1
--- /dev/null
+++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java
@@ -0,0 +1,272 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.plugins.hdfs.HdfsDataFragmenter;
+import org.apache.hawq.pxf.plugins.json.JsonAccessor;
+import org.apache.hawq.pxf.plugins.json.JsonResolver;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JsonExtensionTest extends PxfUnit {
+
+	private static final String IDENTIFIER = JsonAccessor.IDENTIFIER_PARAM;
+	private List<Pair<String, DataType>> columnDefs = null;
+	private List<Pair<String, String>> extraParams = new ArrayList<Pair<String, String>>();
+	private List<String> output = new ArrayList<String>();
+
+	@Before
+	public void before() {
+
+		columnDefs = new ArrayList<Pair<String, DataType>>();
+
+		columnDefs.add(new Pair<String, DataType>("created_at", DataType.TEXT));
+		columnDefs.add(new Pair<String, DataType>("id", DataType.BIGINT));
+		columnDefs.add(new Pair<String, DataType>("text", DataType.TEXT));
+		columnDefs.add(new Pair<String, DataType>("user.screen_name", DataType.TEXT));
+		columnDefs.add(new Pair<String, DataType>("entities.hashtags[0]", DataType.TEXT));
+		columnDefs.add(new Pair<String, DataType>("coordinates.coordinates[0]", DataType.FLOAT8));
+		columnDefs.add(new Pair<String, DataType>("coordinates.coordinates[1]", DataType.FLOAT8));
+
+		output.clear();
+		extraParams.clear();
+	}
+
+	@After
+	public void cleanup() throws Exception {
+		columnDefs.clear();
+	}
+
+	@Test
+	public void testCompressedMultilineJsonFile() throws Exception {
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at"));
+
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets.tar.gz"), output);
+	}
+
+	@Test
+	public void testMaxRecordLength() throws Exception {
+
+		// variable-size-objects.json contains 3 json objects but only 2 of them fit in the 27 byte length limitation
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "key666"));
+		extraParams.add(new Pair<String, String>("MAXLENGTH", "27"));
+
+		columnDefs.clear();
+		columnDefs.add(new Pair<String, DataType>("key666", DataType.TEXT));
+
+		output.add("small object1");
+		// skip the large object2 XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
+		output.add("small object3");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/variable-size-objects.json"), output);
+	}
+
+	@Test
+	public void testDataTypes() throws Exception {
+
+		// TDOO: The BYTEA type is not tested. The implementation (val.asText().getBytes()) returns an array reference
+		// and it is not clear whether this is the desired behavior.
+		//
+		// For the time being avoid using BYTEA type!!!
+
+		// This test also verifies that the order of the columns in the table definition agnostic to the order of the
+		// json attributes.
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "bintType"));
+
+		columnDefs.clear();
+
+		columnDefs.add(new Pair<String, DataType>("text", DataType.TEXT));
+		columnDefs.add(new Pair<String, DataType>("varcharType", DataType.VARCHAR));
+		columnDefs.add(new Pair<String, DataType>("bpcharType", DataType.BPCHAR));
+		columnDefs.add(new Pair<String, DataType>("smallintType", DataType.SMALLINT));
+		columnDefs.add(new Pair<String, DataType>("integerType", DataType.INTEGER));
+		columnDefs.add(new Pair<String, DataType>("realType", DataType.REAL));
+		columnDefs.add(new Pair<String, DataType>("float8Type", DataType.FLOAT8));
+		// The DataType.BYTEA type is left out for further validation.
+		columnDefs.add(new Pair<String, DataType>("charType", DataType.CHAR));
+		columnDefs.add(new Pair<String, DataType>("booleanType", DataType.BOOLEAN));
+		columnDefs.add(new Pair<String, DataType>("bintType", DataType.BIGINT));
+
+		output.add(",varcharType,bpcharType,777,999,3.15,3.14,x,true,666");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/datatypes-test.json"), output);
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testMissingArrayJsonAttribute() throws Exception {
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at"));
+
+		columnDefs.clear();
+
+		columnDefs.add(new Pair<String, DataType>("created_at", DataType.TEXT));
+		// User is not an array! An attempt to access it should throw an exception!
+		columnDefs.add(new Pair<String, DataType>("user[0]", DataType.TEXT));
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-with-missing-text-attribtute.json"), output);
+	}
+
+	@Test
+	public void testMissingJsonAttribute() throws Exception {
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at"));
+
+		// Missing attributes are substituted by an empty field
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,,SpreadButter,tweetCongress,,");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-with-missing-text-attribtute.json"), output);
+	}
+
+	@Test
+	public void testMalformedJsonObject() throws Exception {
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at"));
+
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add(",,,,,,"); // Expected: malformed json records are transformed into empty rows
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-broken.json"), output);
+	}
+
+	@Test
+	public void testSmallTweets() throws Exception {
+
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+		output.add("Fri Jun 07 22:45:03 +0000 2013,343136551322136576,text4,SevenStonesBuoy,,-6.1,50.103");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-small.json"), output);
+	}
+
+	@Test
+	public void testTweetsWithNull() throws Exception {
+
+		output.add("Fri Jun 07 22:45:02 +0000 2013,,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,,text2,patronusdeadly,,,");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/null-tweets.json"), output);
+	}
+
+	@Test
+	public void testSmallTweetsWithDelete() throws Exception {
+
+		output.add(",,,,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-small-with-delete.json"), output);
+	}
+
+	@Test
+	public void testWellFormedJson() throws Exception {
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at"));
+
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-pp.json"), output);
+	}
+
+	@Test
+	public void testWellFormedJsonWithDelete() throws Exception {
+
+		extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at"));
+
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+		super.assertOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-pp-with-delete.json"), output);
+	}
+
+	@Test
+	public void testMultipleFiles() throws Exception {
+
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+		output.add("Fri Jun 07 22:45:03 +0000 2013,343136551322136576,text4,SevenStonesBuoy,,-6.1,50.103");
+		output.add(",,,,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,");
+		output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+		super.assertUnorderedOutput(new Path(System.getProperty("user.dir") + File.separator
+				+ "src/test/resources/tweets-small*.json"), output);
+	}
+
+	@Override
+	public List<Pair<String, String>> getExtraParams() {
+		return extraParams;
+	}
+
+	@Override
+	public Class<? extends Fragmenter> getFragmenterClass() {
+		return HdfsDataFragmenter.class;
+	}
+
+	@Override
+	public Class<? extends ReadAccessor> getReadAccessorClass() {
+		return JsonAccessor.class;
+	}
+
+	@Override
+	public Class<? extends ReadResolver> getReadResolverClass() {
+		return JsonResolver.class;
+	}
+
+	@Override
+	public List<Pair<String, DataType>> getColumnDefinitions() {
+		return columnDefs;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java
new file mode 100644
index 0000000..5669882
--- /dev/null
+++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java
@@ -0,0 +1,666 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.WriteResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.FragmentsResponse;
+import org.apache.hawq.pxf.service.FragmentsResponseFormatter;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+
+/**
+ * This abstract class contains a number of helpful utilities in developing a PXF extension for HAWQ. Extend this class
+ * and use the various <code>assert</code> methods to check given input against known output.
+ */
+public abstract class PxfUnit {
+
+	private static final Log LOG = LogFactory.getLog(PxfUnit.class);
+
+	private static JsonFactory factory = new JsonFactory();
+	private static ObjectMapper mapper = new ObjectMapper(factory);
+
+	protected static List<InputData> inputs = null;
+
+	/**
+	 * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the file for
+	 * output testing.
+	 * 
+	 * @param input
+	 *            Input records
+	 * @param expectedOutput
+	 *            File containing output to check
+	 * @throws Exception
+	 */
+	public void assertOutput(Path input, Path expectedOutput) throws Exception {
+
+		BufferedReader rdr = new BufferedReader(new InputStreamReader(FileSystem.get(new Configuration()).open(
+				expectedOutput)));
+
+		List<String> outputLines = new ArrayList<String>();
+
+		String line;
+		while ((line = rdr.readLine()) != null) {
+			outputLines.add(line);
+		}
+
+		assertOutput(input, outputLines);
+
+		rdr.close();
+	}
+
+	/**
+	 * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the given
+	 * parameter for output testing.
+	 * 
+	 * @param input
+	 *            Input records
+	 * @param expectedOutput
+	 *            File containing output to check
+	 * @throws Exception
+	 */
+	public void assertOutput(Path input, List<String> expectedOutput) throws Exception {
+
+		setup(input);
+		List<String> actualOutput = new ArrayList<String>();
+		for (InputData data : inputs) {
+			ReadAccessor accessor = getReadAccessor(data);
+			ReadResolver resolver = getReadResolver(data);
+
+			actualOutput.addAll(getAllOutput(accessor, resolver));
+		}
+
+		Assert.assertFalse("Output did not match expected output", compareOutput(expectedOutput, actualOutput));
+	}
+
+	/**
+	 * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the given
+	 * parameter for output testing.<br>
+	 * <br>
+	 * Ignores order of records.
+	 * 
+	 * @param input
+	 *            Input records
+	 * @param expectedOutput
+	 *            File containing output to check
+	 * @throws Exception
+	 */
+	public void assertUnorderedOutput(Path input, Path expectedOutput) throws Exception {
+		BufferedReader rdr = new BufferedReader(new InputStreamReader(FileSystem.get(new Configuration()).open(
+				expectedOutput)));
+
+		List<String> outputLines = new ArrayList<String>();
+
+		String line;
+		while ((line = rdr.readLine()) != null) {
+			outputLines.add(line);
+		}
+
+		assertUnorderedOutput(input, outputLines);
+		rdr.close();
+	}
+
+	/**
+	 * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the file for
+	 * output testing.<br>
+	 * <br>
+	 * Ignores order of records.
+	 * 
+	 * @param input
+	 *            Input records
+	 * @param expectedOutput
+	 *            File containing output to check
+	 * @throws Exception
+	 */
+	public void assertUnorderedOutput(Path input, List<String> expectedOutput) throws Exception {
+
+		setup(input);
+
+		List<String> actualOutput = new ArrayList<String>();
+		for (InputData data : inputs) {
+			ReadAccessor accessor = getReadAccessor(data);
+			ReadResolver resolver = getReadResolver(data);
+
+			actualOutput.addAll(getAllOutput(accessor, resolver));
+		}
+
+		Assert.assertFalse("Output did not match expected output", compareUnorderedOutput(expectedOutput, actualOutput));
+	}
+
+	/**
+	 * Writes the output to the given output stream. Comma delimiter.
+	 * 
+	 * @param input
+	 *            The input file
+	 * @param output
+	 *            The output stream
+	 * @throws Exception
+	 */
+	public void writeOutput(Path input, OutputStream output) throws Exception {
+
+		setup(input);
+
+		for (InputData data : inputs) {
+			ReadAccessor accessor = getReadAccessor(data);
+			ReadResolver resolver = getReadResolver(data);
+
+			for (String line : getAllOutput(accessor, resolver)) {
+				output.write((line + "\n").getBytes());
+			}
+		}
+
+		output.flush();
+	}
+
+	/**
+	 * Get the class of the implementation of Fragmenter to be tested.
+	 * 
+	 * @return The class
+	 */
+	public Class<? extends Fragmenter> getFragmenterClass() {
+		return null;
+	}
+
+	/**
+	 * Get the class of the implementation of ReadAccessor to be tested.
+	 * 
+	 * @return The class
+	 */
+	public Class<? extends ReadAccessor> getReadAccessorClass() {
+		return null;
+	}
+
+	/**
+	 * Get the class of the implementation of WriteAccessor to be tested.
+	 * 
+	 * @return The class
+	 */
+	public Class<? extends WriteAccessor> getWriteAccessorClass() {
+		return null;
+	}
+
+	/**
+	 * Get the class of the implementation of Resolver to be tested.
+	 * 
+	 * @return The class
+	 */
+	public Class<? extends ReadResolver> getReadResolverClass() {
+		return null;
+	}
+
+	/**
+	 * Get the class of the implementation of WriteResolver to be tested.
+	 * 
+	 * @return The class
+	 */
+	public Class<? extends WriteResolver> getWriteResolverClass() {
+		return null;
+	}
+
+	/**
+	 * Get any extra parameters that are meant to be specified for the "pxf" protocol. Note that "X-GP-" is prepended to
+	 * each parameter name.
+	 * 
+	 * @return Any extra parameters or null if none.
+	 */
+	public List<Pair<String, String>> getExtraParams() {
+		return null;
+	}
+
+	/**
+	 * Gets the column definition names and data types. Types are DataType objects
+	 * 
+	 * @return A list of column definition name value pairs. Cannot be null.
+	 */
+	public abstract List<Pair<String, DataType>> getColumnDefinitions();
+
+	protected InputData getInputDataForWritableTable() {
+		return getInputDataForWritableTable(null);
+	}
+
+	protected InputData getInputDataForWritableTable(Path input) {
+
+		if (getWriteAccessorClass() == null) {
+			throw new IllegalArgumentException(
+					"getWriteAccessorClass() must be overwritten to return a non-null object");
+		}
+
+		if (getWriteResolverClass() == null) {
+			throw new IllegalArgumentException(
+					"getWriteResolverClass() must be overwritten to return a non-null object");
+		}
+
+		Map<String, String> paramsMap = new HashMap<String, String>();
+
+		paramsMap.put("X-GP-ALIGNMENT", "what");
+		paramsMap.put("X-GP-SEGMENT-ID", "1");
+		paramsMap.put("X-GP-HAS-FILTER", "0");
+		paramsMap.put("X-GP-SEGMENT-COUNT", "1");
+
+		paramsMap.put("X-GP-FORMAT", "GPDBWritable");
+		paramsMap.put("X-GP-URL-HOST", "localhost");
+		paramsMap.put("X-GP-URL-PORT", "50070");
+
+		if (input == null) {
+			paramsMap.put("X-GP-DATA-DIR", "/dummydata");
+		}
+
+		List<Pair<String, DataType>> params = getColumnDefinitions();
+		paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
+		for (int i = 0; i < params.size(); ++i) {
+			paramsMap.put("X-GP-ATTR-NAME" + i, params.get(i).first);
+			paramsMap.put("X-GP-ATTR-TYPENAME" + i, params.get(i).second.name());
+			paramsMap.put("X-GP-ATTR-TYPECODE" + i, Integer.toString(params.get(i).second.getOID()));
+		}
+
+		paramsMap.put("X-GP-ACCESSOR", getWriteAccessorClass().getName());
+		paramsMap.put("X-GP-RESOLVER", getWriteResolverClass().getName());
+
+		if (getExtraParams() != null) {
+			for (Pair<String, String> param : getExtraParams()) {
+				paramsMap.put("X-GP-" + param.first, param.second);
+			}
+		}
+
+		return new ProtocolData(paramsMap);
+	}
+
+	/**
+	 * Set all necessary parameters for GPXF framework to function. Uses the given path as a single input split.
+	 * 
+	 * @param input
+	 *            The input path, relative or absolute.
+	 * @throws Exception
+	 */
+	protected void setup(Path input) throws Exception {
+
+		if (getFragmenterClass() == null) {
+			throw new IllegalArgumentException("getFragmenterClass() must be overwritten to return a non-null object");
+		}
+
+		if (getReadAccessorClass() == null) {
+			throw new IllegalArgumentException("getReadAccessorClass() must be overwritten to return a non-null object");
+		}
+
+		if (getReadResolverClass() == null) {
+			throw new IllegalArgumentException("getReadResolverClass() must be overwritten to return a non-null object");
+		}
+
+		Map<String, String> paramsMap = new HashMap<String, String>();
+
+		// 2.1.0 Properties
+		// HDMetaData parameters
+		paramsMap.put("X-GP-ALIGNMENT", "what");
+		paramsMap.put("X-GP-SEGMENT-ID", "1");
+		paramsMap.put("X-GP-HAS-FILTER", "0");
+		paramsMap.put("X-GP-SEGMENT-COUNT", "1");
+		paramsMap.put("X-GP-FRAGMENTER", getFragmenterClass().getName());
+		paramsMap.put("X-GP-FORMAT", "GPDBWritable");
+		paramsMap.put("X-GP-URL-HOST", "localhost");
+		paramsMap.put("X-GP-URL-PORT", "50070");
+
+		paramsMap.put("X-GP-DATA-DIR", input.toString());
+
+		List<Pair<String, DataType>> params = getColumnDefinitions();
+		paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
+		for (int i = 0; i < params.size(); ++i) {
+			paramsMap.put("X-GP-ATTR-NAME" + i, params.get(i).first);
+			paramsMap.put("X-GP-ATTR-TYPENAME" + i, params.get(i).second.name());
+			paramsMap.put("X-GP-ATTR-TYPECODE" + i, Integer.toString(params.get(i).second.getOID()));
+		}
+
+		// HDFSMetaData properties
+		paramsMap.put("X-GP-ACCESSOR", getReadAccessorClass().getName());
+		paramsMap.put("X-GP-RESOLVER", getReadResolverClass().getName());
+
+		if (getExtraParams() != null) {
+			for (Pair<String, String> param : getExtraParams()) {
+				paramsMap.put("X-GP-" + param.first, param.second);
+			}
+		}
+
+		LocalInputData fragmentInputData = new LocalInputData(paramsMap);
+
+		List<Fragment> fragments = getFragmenter(fragmentInputData).getFragments();
+
+		FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(fragments, input.toString());
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		fragmentsResponse.write(baos);
+
+		String jsonOutput = baos.toString();
+
+		inputs = new ArrayList<InputData>();
+
+		JsonNode node = decodeLineToJsonNode(jsonOutput);
+
+		JsonNode fragmentsArray = node.get("PXFFragments");
+		int i = 0;
+		Iterator<JsonNode> iter = fragmentsArray.getElements();
+		while (iter.hasNext()) {
+			JsonNode fragNode = iter.next();
+			String sourceData = fragNode.get("sourceName").getTextValue();
+			if (!sourceData.startsWith("/")) {
+				sourceData = "/" + sourceData;
+			}
+			paramsMap.put("X-GP-DATA-DIR", sourceData);
+			paramsMap.put("X-GP-FRAGMENT-METADATA", fragNode.get("metadata").getTextValue());
+			paramsMap.put("X-GP-DATA-FRAGMENT", Integer.toString(i++));
+			inputs.add(new LocalInputData(paramsMap));
+		}
+	}
+
+	private JsonNode decodeLineToJsonNode(String line) {
+		try {
+			return mapper.readTree(line);
+		} catch (Exception e) {
+			LOG.warn(e);
+			return null;
+		}
+	}
+
+	/**
+	 * Compares the expected and actual output, printing out any errors.
+	 * 
+	 * @param expectedOutput
+	 *            The expected output
+	 * @param actualOutput
+	 *            The actual output
+	 * @return True if no errors, false otherwise.
+	 */
+	protected boolean compareOutput(List<String> expectedOutput, List<String> actualOutput) {
+
+		boolean error = false;
+		for (int i = 0; i < expectedOutput.size(); ++i) {
+			boolean match = false;
+			for (int j = 0; j < actualOutput.size(); ++j) {
+				if (expectedOutput.get(i).equals(actualOutput.get(j))) {
+					match = true;
+					if (i != j) {
+						LOG.error("Expected (" + expectedOutput.get(i) + ") matched (" + actualOutput.get(j)
+								+ ") but in wrong place.  " + j + " instead of " + i);
+						error = true;
+					}
+
+					break;
+				}
+			}
+
+			if (!match) {
+				LOG.error("Missing expected output: (" + expectedOutput.get(i) + ")");
+				error = true;
+			}
+		}
+
+		for (int i = 0; i < actualOutput.size(); ++i) {
+			boolean match = false;
+			for (int j = 0; j < expectedOutput.size(); ++j) {
+				if (actualOutput.get(i).equals(expectedOutput.get(j))) {
+					match = true;
+					break;
+				}
+			}
+
+			if (!match) {
+				LOG.error("Received unexpected output: (" + actualOutput.get(i) + ")");
+				error = true;
+			}
+		}
+
+		return error;
+	}
+
+	/**
+	 * Compares the expected and actual output, printing out any errors.
+	 * 
+	 * @param expectedOutput
+	 *            The expected output
+	 * @param actualOutput
+	 *            The actual output
+	 * @return True if no errors, false otherwise.
+	 */
+	protected boolean compareUnorderedOutput(List<String> expectedOutput, List<String> actualOutput) {
+
+		boolean error = false;
+		for (int i = 0; i < expectedOutput.size(); ++i) {
+			boolean match = false;
+			for (int j = 0; j < actualOutput.size(); ++j) {
+				if (expectedOutput.get(i).equals(actualOutput.get(j))) {
+					match = true;
+					break;
+				}
+			}
+
+			if (!match) {
+				LOG.error("Missing expected output: (" + expectedOutput.get(i) + ")");
+				error = true;
+			}
+		}
+
+		for (int i = 0; i < actualOutput.size(); ++i) {
+			boolean match = false;
+			for (int j = 0; j < expectedOutput.size(); ++j) {
+				if (actualOutput.get(i).equals(expectedOutput.get(j))) {
+					match = true;
+					break;
+				}
+			}
+
+			if (!match) {
+				LOG.error("Received unexpected output: (" + actualOutput.get(i) + ")");
+				error = true;
+			}
+		}
+
+		return error;
+	}
+
+	/**
+	 * Opens the accessor and reads all output, giving it to the resolver to retrieve the list of fields. These fields
+	 * are then added to a string, delimited by commas, and returned in a list.
+	 * 
+	 * @param accessor
+	 *            The accessor instance to use
+	 * @param resolver
+	 *            The resolver instance to use
+	 * @return The list of output strings
+	 * @throws Exception
+	 */
+	protected List<String> getAllOutput(ReadAccessor accessor, ReadResolver resolver) throws Exception {
+
+		Assert.assertTrue("Accessor failed to open", accessor.openForRead());
+
+		List<String> output = new ArrayList<String>();
+
+		OneRow row = null;
+		while ((row = accessor.readNextObject()) != null) {
+
+			StringBuilder bldr = new StringBuilder();
+			for (OneField field : resolver.getFields(row)) {
+				bldr.append((field != null && field.val != null ? field.val : "") + ",");
+			}
+
+			if (bldr.length() > 0) {
+				bldr.deleteCharAt(bldr.length() - 1);
+			}
+
+			output.add(bldr.toString());
+		}
+
+		accessor.closeForRead();
+
+		return output;
+	}
+
+	/**
+	 * Gets an instance of Fragmenter via reflection.
+	 * 
+	 * Searches for a constructor that has a single parameter of some BaseMetaData type
+	 * 
+	 * @return A Fragmenter instance
+	 * @throws Exception
+	 *             If something bad happens
+	 */
+	protected Fragmenter getFragmenter(InputData meta) throws Exception {
+
+		Fragmenter fragmenter = null;
+
+		for (Constructor<?> c : getFragmenterClass().getConstructors()) {
+			if (c.getParameterTypes().length == 1) {
+				for (Class<?> clazz : c.getParameterTypes()) {
+					if (InputData.class.isAssignableFrom(clazz)) {
+						fragmenter = (Fragmenter) c.newInstance(meta);
+					}
+				}
+			}
+		}
+
+		if (fragmenter == null) {
+			throw new InvalidParameterException("Unable to find Fragmenter constructor with a BaseMetaData parameter");
+		}
+
+		return fragmenter;
+
+	}
+
+	/**
+	 * Gets an instance of ReadAccessor via reflection.
+	 * 
+	 * Searches for a constructor that has a single parameter of some InputData type
+	 * 
+	 * @return An ReadAccessor instance
+	 * @throws Exception
+	 *             If something bad happens
+	 */
+	protected ReadAccessor getReadAccessor(InputData data) throws Exception {
+
+		ReadAccessor accessor = null;
+
+		for (Constructor<?> c : getReadAccessorClass().getConstructors()) {
+			if (c.getParameterTypes().length == 1) {
+				for (Class<?> clazz : c.getParameterTypes()) {
+					if (InputData.class.isAssignableFrom(clazz)) {
+						accessor = (ReadAccessor) c.newInstance(data);
+					}
+				}
+			}
+		}
+
+		if (accessor == null) {
+			throw new InvalidParameterException("Unable to find Accessor constructor with a BaseMetaData parameter");
+		}
+
+		return accessor;
+
+	}
+
+	/**
+	 * Gets an instance of IFieldsResolver via reflection.
+	 * 
+	 * Searches for a constructor that has a single parameter of some BaseMetaData type
+	 * 
+	 * @return A IFieldsResolver instance
+	 * @throws Exception
+	 *             If something bad happens
+	 */
+	protected ReadResolver getReadResolver(InputData data) throws Exception {
+
+		ReadResolver resolver = null;
+
+		// search for a constructor that has a single parameter of a type of
+		// BaseMetaData to create the accessor instance
+		for (Constructor<?> c : getReadResolverClass().getConstructors()) {
+			if (c.getParameterTypes().length == 1) {
+				for (Class<?> clazz : c.getParameterTypes()) {
+					if (InputData.class.isAssignableFrom(clazz)) {
+						resolver = (ReadResolver) c.newInstance(data);
+					}
+				}
+			}
+		}
+
+		if (resolver == null) {
+			throw new InvalidParameterException("Unable to find Resolver constructor with a BaseMetaData parameter");
+		}
+
+		return resolver;
+	}
+
+	public static class Pair<FIRST, SECOND> {
+
+		public FIRST first;
+		public SECOND second;
+
+		public Pair() {
+		}
+
+		public Pair(FIRST f, SECOND s) {
+			this.first = f;
+			this.second = s;
+		}
+	}
+
+	/**
+	 * An extension of InputData for the local file system instead of HDFS. Leveraged by the PXFUnit framework. Do not
+	 * concern yourself with such a simple piece of code.
+	 */
+	public static class LocalInputData extends ProtocolData {
+
+		public LocalInputData(ProtocolData copy) {
+			super(copy);
+			super.setDataSource(super.getDataSource().substring(1));
+		}
+
+		public LocalInputData(Map<String, String> paramsMap) {
+			super(paramsMap);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java
new file mode 100644
index 0000000..aa8ea14
--- /dev/null
+++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java
@@ -0,0 +1,141 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class JsonLexerTest {
+
+	private static final Log LOG = LogFactory.getLog(JsonLexerTest.class);
+
+	@Test
+	public void testSimple() throws IOException {
+		File testsDir = new File("src/test/resources/lexer-tests");
+		File[] jsonFiles = testsDir.listFiles(new FilenameFilter() {
+			public boolean accept(File file, String s) {
+				return s.endsWith(".json");
+			}
+		});
+
+		for (File jsonFile : jsonFiles) {
+			File stateFile = new File(jsonFile.getAbsolutePath() + ".state");
+			if (stateFile.exists()) {
+				runTest(jsonFile, stateFile);
+			}
+		}
+	}
+
+	public static Pattern STATE_RECURRENCE = Pattern.compile("^([A-Za-z\\_0-9]+)\\{([0-9]+)\\}$");
+
+	public void runTest(File jsonFile, File stateFile) throws IOException {
+		List<String> lexerStates = FileUtils.readLines(stateFile);
+		InputStream jsonInputStream = new FileInputStream(jsonFile);
+
+		try {
+			JsonLexer lexer = new JsonLexer();
+
+			int byteOffset = 0;
+			int i;
+			ListIterator<String> stateIterator = lexerStates.listIterator();
+			int recurrence = 0;
+			JsonLexer.JsonLexerState expectedState = null;
+			StringBuilder sb = new StringBuilder();
+			int stateFileLineNum = 0;
+			while ((i = jsonInputStream.read()) != -1) {
+				byteOffset++;
+				char c = (char) i;
+
+				sb.append(c);
+
+				lexer.lex(c);
+
+				if (lexer.getState() == JsonLexer.JsonLexerState.WHITESPACE) {
+					// optimization to skip over multiple whitespaces
+					continue;
+				}
+
+				if (!stateIterator.hasNext()) {
+					assertFalse(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum)
+							+ ": Input stream had character '" + c + "' but no matching state", true);
+				}
+
+				if (recurrence <= 0) {
+					String state = stateIterator.next().trim();
+					stateFileLineNum++;
+
+					while (state.equals("") || state.startsWith("#")) {
+						if (!stateIterator.hasNext()) {
+							assertFalse(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum)
+									+ ": Input stream had character '" + c + "' but no matching state", true);
+						}
+						state = stateIterator.next().trim();
+						stateFileLineNum++;
+					}
+
+					Matcher m = STATE_RECURRENCE.matcher(state);
+					recurrence = 1;
+					if (m.matches()) {
+						state = m.group(1);
+						recurrence = Integer.valueOf(m.group(2));
+					}
+					expectedState = JsonLexer.JsonLexerState.valueOf(state);
+				}
+
+				assertEquals(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum)
+						+ ": Issue for char '" + c + "'", expectedState, lexer.getState());
+				recurrence--;
+			}
+
+			if (stateIterator.hasNext()) {
+				assertFalse(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum)
+						+ ": Input stream has ended but more states were expected: '" + stateIterator.next() + "...'",
+						true);
+			}
+
+		} finally {
+			IOUtils.closeQuietly(jsonInputStream);
+		}
+
+		LOG.info("File " + jsonFile.getName() + " passed");
+
+	}
+
+	static String formatStateInfo(File jsonFile, String streamContents, int streamByteOffset, int stateFileLineNum) {
+		return jsonFile.getName() + ": Input stream currently at byte-offset " + streamByteOffset + ", contents = '"
+				+ streamContents + "'" + " state-file line = " + stateFileLineNum;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java
new file mode 100644
index 0000000..cdc876b
--- /dev/null
+++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java
@@ -0,0 +1,83 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class PartitionedJsonParserNoSeekTest {
+
+	private static final Log LOG = LogFactory.getLog(PartitionedJsonParserNoSeekTest.class);
+
+	@Test
+	public void testNoSeek() throws IOException {
+		File testsDir = new File("src/test/resources/parser-tests/noseek");
+		File[] jsonFiles = testsDir.listFiles(new FilenameFilter() {
+			public boolean accept(File file, String s) {
+				return s.endsWith(".json");
+			}
+		});
+
+		for (File jsonFile : jsonFiles) {
+			runTest(jsonFile);
+		}
+	}
+
+	public void runTest(final File jsonFile) throws IOException {
+		InputStream jsonInputStream = new FileInputStream(jsonFile);
+
+		try {
+			PartitionedJsonParser parser = new PartitionedJsonParser(jsonInputStream);
+
+			File[] jsonOjbectFiles = jsonFile.getParentFile().listFiles(new FilenameFilter() {
+				public boolean accept(File file, String s) {
+					return s.contains(jsonFile.getName()) && s.contains("expected");
+				}
+			});
+
+			for (File jsonObjectFile : jsonOjbectFiles) {
+				String expected = trimWhitespaces(FileUtils.readFileToString(jsonObjectFile));
+				String result = parser.nextObjectContainingMember("name");
+				assertNotNull(jsonFile.getName() + "/" + jsonObjectFile.getName(), result);
+				assertEquals(jsonFile.getName() + "/" + jsonObjectFile.getName(), expected, trimWhitespaces(result));
+				LOG.info("File " + jsonFile.getName() + "/" + jsonObjectFile.getName() + " passed");
+			}
+
+		} finally {
+			IOUtils.closeQuietly(jsonInputStream);
+		}
+	}
+
+	public String trimWhitespaces(String s) {
+		return s.replaceAll("[\\n\\t\\r \\t]+", " ").trim();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java
new file mode 100644
index 0000000..3a1b3b6
--- /dev/null
+++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java
@@ -0,0 +1,55 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Test;
+
+public class PartitionedJsonParserOffsetTest {
+	/*
+	 * [{"color": "red","v": "vv"},{"color": "red","v": "vv"}]
+	 */
+	public static String json2 = "[{\"color\": \"red\",\"v\": \"vv\"},{\"color\": \"red\",\"v\": \"vv\"}]";
+
+	@Test
+	public void testOffset() throws IOException {
+		InputStream jsonInputStream = createFromString(json2);
+		PartitionedJsonParser parser = new PartitionedJsonParser(jsonInputStream);
+		String result = parser.nextObjectContainingMember("color");
+		assertNotNull(result);
+		assertEquals(27, parser.getBytesRead());
+		assertEquals(1, parser.getBytesRead() - result.length());
+		result = parser.nextObjectContainingMember("color");
+		assertNotNull(result);
+		assertEquals(54, parser.getBytesRead());
+		assertEquals(28, parser.getBytesRead() - result.length());
+		jsonInputStream.close();
+	}
+
+	public InputStream createFromString(String s) {
+		return new ByteArrayInputStream(s.getBytes());
+	}
+}


Mime
View raw message