hawq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hornn <...@git.apache.org>
Subject [GitHub] incubator-hawq pull request: HAWQ-178: Add JSON plugin support in ...
Date Thu, 04 Feb 2016 04:05:07 GMT
Github user hornn commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/302#discussion_r51827170
  
    --- Diff: pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
---
    @@ -0,0 +1,176 @@
    +package org.apache.hawq.pxf.plugins.json;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.io.compress.CompressionCodec;
    +import org.apache.hadoop.io.compress.CompressionCodecFactory;
    +import org.apache.hadoop.mapred.FileSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapred.RecordReader;
    +import org.apache.hawq.pxf.plugins.json.parser.PartitionedJsonParser;
    +
    +/**
    + * Multi-line json object reader. JsonRecordReader uses a member name (set by the <b>IDENTIFIER</b>
PXF parameter) to
    + * determine the encapsulating object to extract and read.
    + * 
    + * JsonRecordReader supports compressed input files as well.
    + * 
    + * As a safe guard set the optional <b>MAXLENGTH</b> parameter to limit the
max size of a record.
    + */
    +public class JsonRecordReader implements RecordReader<LongWritable, Text> {
    +
    +	private static final Log LOG = LogFactory.getLog(JsonRecordReader.class);
    +
    +	public static final String RECORD_MEMBER_IDENTIFIER = "json.input.format.record.identifier";
    +	public static final String RECORD_MAX_LENGTH = "multilinejsonrecordreader.maxlength";
    +
    +	private CompressionCodecFactory compressionCodecs = null;
    +	private long start;
    +	private long pos;
    +	private long end;
    +	private int maxObjectLength;
    +	private InputStream is;
    +	private PartitionedJsonParser parser;
    +	private final String jsonMemberName;
    +
    +	/**
    +	 * Create new multi-line json object reader.
    +	 * 
    +	 * @param conf
    +	 *            Hadoop context
    +	 * @param split
    +	 *            HDFS split to start the reading from
    +	 * @throws IOException
    +	 */
    +	public JsonRecordReader(JobConf conf, FileSplit split) throws IOException {
    +
    +		this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER);
    +		this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, Integer.MAX_VALUE);
    +
    +		start = split.getStart();
    +		end = start + split.getLength();
    +		final Path file = split.getPath();
    +		compressionCodecs = new CompressionCodecFactory(conf);
    +		final CompressionCodec codec = compressionCodecs.getCodec(file);
    --- End diff --
    
    Regarding compression, we support it for hdfs files, and the implementation here looks
very similar to the one in LineRecordReader. The issue worth mentioning here is that BZip2
Codec is not thread safe. For map-reduce jobs it doesn't matter, because each job is running
in its own JVM, but we are handling multiple concurrent requests through tomcat, so that introduced
a problem.
    For Hdfs files, we "solved" it by forcing the processing of bzip2 files to be done in
a single thread. See HdfsUtilities.isThreadSafe() which is called by HdfsSplittableDataAccessor.
I see that the JsonAccessor inherits from this class, so I think it should be fine, but it's
worth verifying.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message