Return-Path: X-Original-To: apmail-hawq-commits-archive@minotaur.apache.org Delivered-To: apmail-hawq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C80AC18B68 for ; Tue, 3 Nov 2015 00:36:21 +0000 (UTC) Received: (qmail 60318 invoked by uid 500); 3 Nov 2015 00:36:21 -0000 Delivered-To: apmail-hawq-commits-archive@hawq.apache.org Received: (qmail 60274 invoked by uid 500); 3 Nov 2015 00:36:21 -0000 Mailing-List: contact commits-help@hawq.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hawq.incubator.apache.org Delivered-To: mailing list commits@hawq.incubator.apache.org Received: (qmail 60265 invoked by uid 99); 3 Nov 2015 00:36:21 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Nov 2015 00:36:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 23D5EC2947 for ; Tue, 3 Nov 2015 00:36:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.79 X-Spam-Level: * X-Spam-Status: No, score=1.79 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Ix_TpXlAOP6K for ; Tue, 3 Nov 2015 00:36:06 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 9633E444CC for ; Tue, 3 Nov 2015 00:36:05 +0000 (UTC) Received: (qmail 59444 invoked by uid 99); 3 Nov 2015 00:36:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Nov 2015 00:36:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33C5BE0A59; Tue, 3 Nov 2015 00:36:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shivram@apache.org To: commits@hawq.incubator.apache.org Date: Tue, 03 Nov 2015 00:36:14 -0000 Message-Id: <6851bd7bbdb94debbc8dc62ce86e5fee@git.apache.org> In-Reply-To: <05b60816cabf429da197c75adc3db92a@git.apache.org> References: <05b60816cabf429da197c75adc3db92a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/15] incubator-hawq git commit: HAWQ-45. PXF package namespace refactor http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java deleted file mode 100644 index 0f8f908..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAnalyzer.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.Analyzer; -import com.pivotal.pxf.api.AnalyzerStats; -import com.pivotal.pxf.api.ReadAccessor; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.service.ReadBridge; -import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities; -import com.pivotal.pxf.plugins.hdfs.utilities.PxfInputFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; - -import java.io.IOException; -import java.util.ArrayList; - -/** - * Analyzer class for HDFS data resources - * - * Given an HDFS data source (a file, directory, or wild card pattern) - * return statistics about it (number of blocks, number of tuples, etc.) - */ -public class HdfsAnalyzer extends Analyzer { - private JobConf jobConf; - private FileSystem fs; - private Log Log; - - /** - * Constructs an HdfsAnalyzer object. - * - * @param inputData all input parameters coming from the client - * @throws IOException if HDFS file system cannot be retrieved - */ - public HdfsAnalyzer(InputData inputData) throws IOException { - super(inputData); - Log = LogFactory.getLog(HdfsAnalyzer.class); - - jobConf = new JobConf(new Configuration(), HdfsAnalyzer.class); - fs = FileSystem.get(jobConf); - } - - /** - * Collects a number of basic statistics based on an estimate. Statistics - * are: number of records, number of hdfs blocks and hdfs block size. - * - * @param datapath path is a data source URI that can appear as a file - * name, a directory name or a wildcard pattern - * @return statistics in JSON format - * @throws Exception if path is wrong, its metadata cannot be retrieved - * from file system, or if scanning the first block - * using the accessor failed - */ - @Override - public AnalyzerStats getEstimatedStats(String datapath) throws Exception { - long blockSize = 0; - long numberOfBlocks; - Path path = new Path(HdfsUtilities.absoluteDataPath(datapath)); - - ArrayList splits = getSplits(path); - - for (InputSplit split : splits) { - FileSplit fsp = (FileSplit) split; - Path filePath = fsp.getPath(); - FileStatus fileStatus = fs.getFileStatus(filePath); - if (fileStatus.isFile()) { - blockSize = fileStatus.getBlockSize(); - break; - } - } - - // if no file is in path (only dirs), get default block size - if (blockSize == 0) { - blockSize = fs.getDefaultBlockSize(path); - } - numberOfBlocks = splits.size(); - - - long numberOfTuplesInBlock = getNumberOfTuplesInBlock(splits); - AnalyzerStats stats = new AnalyzerStats(blockSize, numberOfBlocks, numberOfTuplesInBlock * numberOfBlocks); - - //print files size to log when in debug level - Log.debug(AnalyzerStats.dataToString(stats, path.toString())); - - return stats; - } - - /** - * Calculates the number of tuples in a split (block). - * Reads one block from HDFS. Exception during reading will - * filter upwards and handled in AnalyzerResource - */ - private long getNumberOfTuplesInBlock(ArrayList splits) throws Exception { - long tuples = -1; /* default - if we are not able to read data */ - ReadAccessor accessor; - - if (splits.isEmpty()) { - return 0; - } - - /* - * metadata information includes: file split's - * start, length and hosts (locations). - */ - FileSplit firstSplit = (FileSplit) splits.get(0); - byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(firstSplit); - inputData.setFragmentMetadata(fragmentMetadata); - inputData.setDataSource(firstSplit.getPath().toUri().getPath()); - accessor = ReadBridge.getFileAccessor(inputData); - - if (accessor.openForRead()) { - tuples = 0; - while (accessor.readNextObject() != null) { - tuples++; - } - - accessor.closeForRead(); - } - - return tuples; - } - - private ArrayList getSplits(Path path) throws IOException { - PxfInputFormat fformat = new PxfInputFormat(); - PxfInputFormat.setInputPaths(jobConf, path); - InputSplit[] splits = fformat.getSplits(jobConf, 1); - ArrayList result = new ArrayList(); - - // remove empty splits - if (splits != null) { - for (InputSplit split : splits) { - if (split.getLength() > 0) { - result.add(split); - } - } - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java deleted file mode 100644 index 4a43c5f..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java +++ /dev/null @@ -1,112 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.OneRow; -import com.pivotal.pxf.api.ReadAccessor; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.api.utilities.Plugin; -import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; - -/** - * Base class for enforcing the complete access of a file in one accessor. - * Since we are not accessing the file using the splittable API, but instead are - * using the "simple" stream API, it means that we cannot fetch different parts - * (splits) of the file in different segments. Instead each file access brings - * the complete file. And, if several segments would access the same file, then - * each one will return the whole file and we will observe in the query result, - * each record appearing number_of_segments times. To avoid this we will only - * have one segment (segment 0) working for this case - enforced with - * isWorkingSegment() method. Naturally this is the less recommended working - * mode since we are not making use of segment parallelism. HDFS accessors for - * a specific file type should inherit from this class only if the file they are - * reading does not support splitting: a protocol-buffer file, regular file, ... - */ -public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAccessor { - private Configuration conf = null; - protected InputStream inp = null; - private FileSplit fileSplit = null; - - /** - * Constructs a HdfsAtomicDataAccessor object. - * - * @param input all input parameters coming from the client - */ - public HdfsAtomicDataAccessor(InputData input) { - // 0. Hold the configuration data - super(input); - - // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files - conf = new Configuration(); - - fileSplit = HdfsUtilities.parseFragmentMetadata(inputData); - } - - /** - * Opens the file using the non-splittable API for HADOOP HDFS file access - * This means that instead of using a FileInputFormat for access, we use a - * Java stream. - * - * @return true for successful file open, false otherwise - */ - @Override - public boolean openForRead() throws Exception { - if (!isWorkingSegment()) { - return false; - } - - // input data stream - FileSystem fs = FileSystem.get(URI.create(inputData.getDataSource()), conf); // FileSystem.get actually returns an FSDataInputStream - inp = fs.open(new Path(inputData.getDataSource())); - - return (inp != null); - } - - /** - * Fetches one record from the file. - * - * @return a {@link OneRow} record as a Java object. Returns null if none. - */ - @Override - public OneRow readNextObject() throws IOException { - if (!isWorkingSegment()) { - return null; - } - - return new OneRow(null, new Object()); - } - - /** - * Closes the access stream when finished reading the file - */ - @Override - public void closeForRead() throws Exception { - if (!isWorkingSegment()) { - return; - } - - if (inp != null) { - inp.close(); - } - } - - /* - * Making sure that only the segment that got assigned the first data - * fragment will read the (whole) file. - */ - private boolean isWorkingSegment() { - return (fileSplit.getStart() == 0); - } - - @Override - public boolean isThreadSafe() { - return HdfsUtilities.isThreadSafe(inputData.getDataSource(), - inputData.getUserProperty("COMPRESSION_CODEC")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java deleted file mode 100644 index 1bf5aab..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsDataFragmenter.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.Fragment; -import com.pivotal.pxf.api.Fragmenter; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities; -import com.pivotal.pxf.plugins.hdfs.utilities.PxfInputFormat; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; - -import java.io.IOException; -import java.util.List; - -/** - * Fragmenter class for HDFS data resources. - * - * Given an HDFS data source (a file, directory, or wild card pattern) divide - * the data into fragments and return a list of them along with a list of - * host:port locations for each. - */ -public class HdfsDataFragmenter extends Fragmenter { - private JobConf jobConf; - - /** - * Constructs an HdfsDataFragmenter object. - * - * @param md all input parameters coming from the client - */ - public HdfsDataFragmenter(InputData md) { - super(md); - - jobConf = new JobConf(new Configuration(), HdfsDataFragmenter.class); - } - - /** - * Gets the fragments for a data source URI that can appear as a file name, - * a directory name or a wildcard. Returns the data fragments in JSON - * format. - */ - @Override - public List getFragments() throws Exception { - String absoluteDataPath = HdfsUtilities.absoluteDataPath(inputData.getDataSource()); - InputSplit[] splits = getSplits(new Path(absoluteDataPath)); - - for (InputSplit split : splits != null ? splits : new InputSplit[] {}) { - FileSplit fsp = (FileSplit) split; - - /* - * HD-2547: If the file is empty, an empty split is returned: no - * locations and no length. - */ - if (fsp.getLength() <= 0) { - continue; - } - - String filepath = fsp.getPath().toUri().getPath(); - String[] hosts = fsp.getLocations(); - - /* - * metadata information includes: file split's start, length and - * hosts (locations). - */ - byte[] fragmentMetadata = HdfsUtilities.prepareFragmentMetadata(fsp); - Fragment fragment = new Fragment(filepath, hosts, fragmentMetadata); - fragments.add(fragment); - } - - return fragments; - } - - private InputSplit[] getSplits(Path path) throws IOException { - PxfInputFormat format = new PxfInputFormat(); - PxfInputFormat.setInputPaths(jobConf, path); - return format.getSplits(jobConf, 1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java deleted file mode 100644 index 744342d..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java +++ /dev/null @@ -1,146 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.OneRow; -import com.pivotal.pxf.api.ReadAccessor; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.api.utilities.Plugin; -import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.*; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.ListIterator; - -/** - * Accessor for accessing a splittable HDFS data sources. HDFS will divide the - * file into splits based on an internal decision (by default, the block size is - * also the split size). - * - * Accessors that require such base functionality should extend this class. - */ -public abstract class HdfsSplittableDataAccessor extends Plugin implements - ReadAccessor { - protected Configuration conf = null; - protected RecordReader reader = null; - protected InputFormat inputFormat = null; - protected ListIterator iter = null; - protected JobConf jobConf = null; - protected Object key, data; - - /** - * Constructs an HdfsSplittableDataAccessor - * - * @param input all input parameters coming from the client request - * @param inFormat the HDFS {@link InputFormat} the caller wants to use - */ - public HdfsSplittableDataAccessor(InputData input, - InputFormat inFormat) { - super(input); - inputFormat = inFormat; - - // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files - conf = new Configuration(); - - // 2. variable required for the splits iteration logic - jobConf = new JobConf(conf, HdfsSplittableDataAccessor.class); - } - - /** - * Fetches the requested fragment (file split) for the current client - * request, and sets a record reader for the job. - * - * @return true if succeeded, false if no more splits to be read - */ - @Override - public boolean openForRead() throws Exception { - LinkedList requestSplits = new LinkedList(); - FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData); - requestSplits.add(fileSplit); - - // Initialize record reader based on current split - iter = requestSplits.listIterator(0); - return getNextSplit(); - } - - /** - * Specialized accessors will override this method and implement their own - * recordReader. For example, a plain delimited text accessor may want to - * return a LineRecordReader. - * - * @param jobConf the hadoop jobconf to use for the selected InputFormat - * @param split the input split to be read by the accessor - * @return a recordreader to be used for reading the data records of the - * split - * @throws IOException if recordreader could not be created - */ - abstract protected Object getReader(JobConf jobConf, InputSplit split) - throws IOException; - - /** - * Sets the current split and initializes a RecordReader who feeds from the - * split - * - * @return true if there is a split to read - * @throws IOException if record reader could not be created - */ - @SuppressWarnings(value = "unchecked") - protected boolean getNextSplit() throws IOException { - if (!iter.hasNext()) { - return false; - } - - InputSplit currSplit = iter.next(); - reader = (RecordReader) getReader(jobConf, currSplit); - key = reader.createKey(); - data = reader.createValue(); - return true; - } - - /** - * Fetches one record from the file. The record is returned as a Java - * object. - */ - @Override - public OneRow readNextObject() throws IOException { - // if there is one more record in the current split - if (!reader.next(key, data)) { - // the current split is exhausted. try to move to the next split - if (getNextSplit()) { - // read the first record of the new split - if (!reader.next(key, data)) { - // make sure we return nulls - return null; - } - } else { - // make sure we return nulls - return null; - } - } - - /* - * if neither condition was met, it means we already read all the - * records in all the splits, and in this call record variable was not - * set, so we return null and thus we are signaling end of records - * sequence - */ - return new OneRow(key, data); - } - - /** - * When user finished reading the file, it closes the RecordReader - */ - @Override - public void closeForRead() throws Exception { - if (reader != null) { - reader.close(); - } - } - - @Override - public boolean isThreadSafe() { - return HdfsUtilities.isThreadSafe(inputData.getDataSource(), - inputData.getUserProperty("COMPRESSION_CODEC")); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java deleted file mode 100644 index 32002a1..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/LineBreakAccessor.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.OneRow; -import com.pivotal.pxf.api.WriteAccessor; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapred.*; - -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * A PXF Accessor for reading delimited plain text records. - */ -public class LineBreakAccessor extends HdfsSplittableDataAccessor implements - WriteAccessor { - private DataOutputStream dos; - private FSDataOutputStream fsdos; - private Configuration conf; - private FileSystem fs; - private Path file; - private static Log Log = LogFactory.getLog(LineBreakAccessor.class); - - /** - * Constructs a LineReaderAccessor. - * - * @param input all input parameters coming from the client request - */ - public LineBreakAccessor(InputData input) { - super(input, new TextInputFormat()); - ((TextInputFormat) inputFormat).configure(jobConf); - } - - @Override - protected Object getReader(JobConf jobConf, InputSplit split) - throws IOException { - return new ChunkRecordReader(jobConf, (FileSplit) split); - } - - /** - * Opens file for write. - */ - @Override - public boolean openForWrite() throws Exception { - - String fileName = inputData.getDataSource(); - String compressCodec = inputData.getUserProperty("COMPRESSION_CODEC"); - CompressionCodec codec = null; - - conf = new Configuration(); - fs = FileSystem.get(conf); - - // get compression codec - if (compressCodec != null) { - codec = HdfsUtilities.getCodec(conf, compressCodec); - String extension = codec.getDefaultExtension(); - fileName += extension; - } - - file = new Path(fileName); - - if (fs.exists(file)) { - throw new IOException("file " + file.toString() - + " already exists, can't write data"); - } - org.apache.hadoop.fs.Path parent = file.getParent(); - if (!fs.exists(parent)) { - fs.mkdirs(parent); - Log.debug("Created new dir " + parent.toString()); - } - - // create output stream - do not allow overwriting existing file - createOutputStream(file, codec); - - return true; - } - - /* - * Creates output stream from given file. If compression codec is provided, - * wrap it around stream. - */ - private void createOutputStream(Path file, CompressionCodec codec) - throws IOException { - fsdos = fs.create(file, false); - if (codec != null) { - dos = new DataOutputStream(codec.createOutputStream(fsdos)); - } else { - dos = fsdos; - } - - } - - /** - * Writes row into stream. - */ - @Override - public boolean writeNextObject(OneRow onerow) throws Exception { - dos.write((byte[]) onerow.getData()); - return true; - } - - /** - * Closes the output stream after done writing. - */ - @Override - public void closeForWrite() throws Exception { - if ((dos != null) && (fsdos != null)) { - Log.debug("Closing writing stream for path " + file); - dos.flush(); - /* - * From release 0.21.0 sync() is deprecated in favor of hflush(), - * which only guarantees that new readers will see all data written - * to that point, and hsync(), which makes a stronger guarantee that - * the operating system has flushed the data to disk (like POSIX - * fsync), although data may still be in the disk cache. - */ - fsdos.hsync(); - dos.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java deleted file mode 100644 index 13880b8..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/QuotedLineBreakAccessor.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.OneRow; -import com.pivotal.pxf.api.utilities.InputData; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -/** - * A (atomic) PXF Accessor for reading \n delimited files with quoted - * field delimiter, line delimiter, and quotes. This accessor supports - * multi-line records, that are read from a single source (non-parallel). - */ -public class QuotedLineBreakAccessor extends HdfsAtomicDataAccessor { - private BufferedReader reader; - - /** - * Constructs a QuotedLineBreakAccessor. - * - * @param input all input parameters coming from the client request - */ - public QuotedLineBreakAccessor(InputData input) { - super(input); - } - - @Override - public boolean openForRead() throws Exception { - if (!super.openForRead()) { - return false; - } - reader = new BufferedReader(new InputStreamReader(inp)); - return true; - } - - /** - * Fetches one record (maybe partial) from the file. The record is returned as a Java object. - */ - @Override - public OneRow readNextObject() throws IOException { - if (super.readNextObject() == null) /* check if working segment */ { - return null; - } - - String next_line = reader.readLine(); - if (next_line == null) /* EOF */ { - return null; - } - - return new OneRow(null, next_line); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java deleted file mode 100644 index 5f3f3dd..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/SequenceFileAccessor.java +++ /dev/null @@ -1,215 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.OneRow; -import com.pivotal.pxf.api.WriteAccessor; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.plugins.hdfs.utilities.HdfsUtilities; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapred.*; - -import java.io.IOException; -import java.util.EnumSet; - -/** - * A PXF Accessor for reading and writing Sequence File records - */ -public class SequenceFileAccessor extends HdfsSplittableDataAccessor implements - WriteAccessor { - - private Configuration conf; - private FileContext fc; - private Path file; - private CompressionCodec codec; - private CompressionType compressionType; - private SequenceFile.Writer writer; - private LongWritable defaultKey; // used when recordkey is not defined - - private static Log Log = LogFactory.getLog(SequenceFileAccessor.class);; - - /** - * Constructs a SequenceFileAccessor. - * - * @param input all input parameters coming from the client request - */ - public SequenceFileAccessor(InputData input) { - super(input, new SequenceFileInputFormat()); - } - - /** - * Overrides virtual method to create specialized record reader - */ - @Override - protected Object getReader(JobConf jobConf, InputSplit split) - throws IOException { - return new SequenceFileRecordReader(jobConf, (FileSplit) split); - } - - @Override - public boolean openForWrite() throws Exception { - FileSystem fs; - Path parent; - String fileName = inputData.getDataSource(); - conf = new Configuration(); - - getCompressionCodec(inputData); - fileName = updateFileExtension(fileName, codec); - - // construct the output stream - file = new Path(fileName); - fs = file.getFileSystem(conf); - fc = FileContext.getFileContext(); - defaultKey = new LongWritable(inputData.getSegmentId()); - - if (fs.exists(file)) { - throw new IOException("file " + file - + " already exists, can't write data"); - } - parent = file.getParent(); - if (!fs.exists(parent)) { - fs.mkdirs(parent); - Log.debug("Created new dir " + parent); - } - - writer = null; - return true; - } - - /** - * Compression: based on compression codec and compression type (default - * value RECORD). If there is no codec, compression type is ignored, and - * NONE is used. - * - * @param inputData - container where compression codec and type are held - */ - private void getCompressionCodec(InputData inputData) { - - String userCompressCodec = inputData.getUserProperty("COMPRESSION_CODEC"); - String userCompressType = inputData.getUserProperty("COMPRESSION_TYPE"); - String parsedCompressType = parseCompressionType(userCompressType); - - compressionType = SequenceFile.CompressionType.NONE; - codec = null; - if (userCompressCodec != null) { - codec = HdfsUtilities.getCodec(conf, userCompressCodec); - - try { - compressionType = CompressionType.valueOf(parsedCompressType); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - "Illegal value for compression type " + "'" - + parsedCompressType + "'"); - } - if (compressionType == null) { - throw new IllegalArgumentException( - "Compression type must be defined"); - } - - Log.debug("Compression ON: " + "compression codec: " - + userCompressCodec + ", compression type: " - + compressionType); - } - } - - /* - * Parses compression type for sequence file. If null, default to RECORD. - * Allowed values: RECORD, BLOCK. - */ - private String parseCompressionType(String compressType) { - final String COMPRESSION_TYPE_RECORD = "RECORD"; - final String COMPRESSION_TYPE_BLOCK = "BLOCK"; - final String COMPRESSION_TYPE_NONE = "NONE"; - - if (compressType == null) { - return COMPRESSION_TYPE_RECORD; - } - - if (compressType.equalsIgnoreCase(COMPRESSION_TYPE_NONE)) { - throw new IllegalArgumentException( - "Illegal compression type 'NONE'. " - + "For disabling compression remove COMPRESSION_CODEC parameter."); - } - - if (!compressType.equalsIgnoreCase(COMPRESSION_TYPE_RECORD) - && !compressType.equalsIgnoreCase(COMPRESSION_TYPE_BLOCK)) { - throw new IllegalArgumentException("Illegal compression type '" - + compressType + "'"); - } - - return compressType.toUpperCase(); - } - - /* - * Returns fileName with the codec's file extension appended - */ - private String updateFileExtension(String fileName, CompressionCodec codec) { - - if (codec != null) { - fileName += codec.getDefaultExtension(); - } - Log.debug("File name for write: " + fileName); - return fileName; - } - - @Override - public boolean writeNextObject(OneRow onerow) throws IOException { - Writable value = (Writable) onerow.getData(); - Writable key = (Writable) onerow.getKey(); - - // init writer on first approach here, based on onerow.getData type - // TODO: verify data is serializable. - if (writer == null) { - Class valueClass = value.getClass(); - Class keyClass = (key == null) ? LongWritable.class - : key.getClass(); - // create writer - do not allow overwriting existing file - writer = SequenceFile.createWriter(fc, conf, file, keyClass, - valueClass, compressionType, codec, - new SequenceFile.Metadata(), EnumSet.of(CreateFlag.CREATE)); - } - - try { - writer.append((key == null) ? defaultKey : key, value); - } catch (IOException e) { - Log.error("Failed to write data to file: " + e.getMessage()); - return false; - } - - return true; - } - - @Override - public void closeForWrite() throws Exception { - if (writer != null) { - writer.sync(); - /* - * From release 0.21.0 sync() is deprecated in favor of hflush(), - * which only guarantees that new readers will see all data written - * to that point, and hsync(), which makes a stronger guarantee that - * the operating system has flushed the data to disk (like POSIX - * fsync), although data may still be in the disk cache. - */ - writer.hsync(); - writer.close(); - } - } - - public CompressionType getCompressionType() { - return compressionType; - } - - public CompressionCodec getCodec() { - return codec; - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java deleted file mode 100644 index 7ace4c8..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/StringPassResolver.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.io.DataType; -import com.pivotal.pxf.api.OneField; -import com.pivotal.pxf.api.OneRow; -import com.pivotal.pxf.api.ReadResolver; -import com.pivotal.pxf.api.WriteResolver; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.api.utilities.Plugin; -import com.pivotal.pxf.plugins.hdfs.ChunkWritable; - -import java.util.LinkedList; -import java.util.List; - -import static com.pivotal.pxf.api.io.DataType.VARCHAR; - -/** - * StringPassResolver handles "deserialization" and serialization of - * String records. StringPassResolver implements IReadResolver and - * IWriteResolver interfaces. Returns strings as-is. - */ -public class StringPassResolver extends Plugin implements ReadResolver, WriteResolver { - // for write - private OneRow oneRow; - - /** - * Constructs a StringPassResolver. - * - * @param inputData input all input parameters coming from the client request - */ - public StringPassResolver(InputData inputData) { - super(inputData); - oneRow = new OneRow(); - this.inputData = inputData; - } - - /** - * Returns a list of the fields of one record. - * Each record field is represented by a {@link OneField} item. - * OneField item contains two fields: an integer representing the field type and a Java - * Object representing the field value. - */ - @Override - public List getFields(OneRow onerow) { - /* - * This call forces a whole text line into a single varchar field and replaces - * the proper field separation code can be found in previous revisions. The reasons - * for doing so as this point are: - * 1. performance - * 2. desire to not replicate text parsing logic from the backend into java - */ - List record = new LinkedList(); - Object data = onerow.getData(); - if (data instanceof ChunkWritable) { - record.add(new OneField(DataType.BYTEA.getOID(), ((ChunkWritable)data).box)); - } - else { - record.add(new OneField(VARCHAR.getOID(), data)); - } - return record; - } - - /** - * Creates a OneRow object from the singleton list. - */ - @Override - public OneRow setFields(List record) throws Exception { - if (((byte[]) record.get(0).val).length == 0) { - return null; - } - - oneRow.setData(record.get(0).val); - return oneRow; - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java deleted file mode 100644 index e9df907..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/WritableResolver.java +++ /dev/null @@ -1,220 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs; - -import com.pivotal.pxf.api.*; -import com.pivotal.pxf.api.io.DataType; -import com.pivotal.pxf.api.utilities.InputData; -import com.pivotal.pxf.api.utilities.Plugin; -import com.pivotal.pxf.plugins.hdfs.utilities.RecordkeyAdapter; -import com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException; -import com.pivotal.pxf.service.utilities.Utilities; - -import static com.pivotal.pxf.plugins.hdfs.utilities.DataSchemaException.MessageFmt.*; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Writable; - -import java.lang.reflect.Array; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.LinkedList; -import java.util.List; - -import static com.pivotal.pxf.api.io.DataType.*; - -/** - * WritableResolver handles serialization and deserialization of records - * that were serialized using Hadoop's Writable serialization framework. - * - * A field named 'recordkey' is treated as a key of the given row, and not as - * part of the data schema. See {@link RecordkeyAdapter}. - */ -public class WritableResolver extends Plugin implements ReadResolver, WriteResolver { - private static final int RECORDKEY_UNDEFINED = -1; - private static final Log LOG = LogFactory.getLog(WritableResolver.class); - private RecordkeyAdapter recordkeyAdapter = new RecordkeyAdapter(); - private int recordkeyIndex; - // reflection fields - private Object userObject = null; - private Field[] fields = null; - - - /** - * Constructs a WritableResolver. - * - * @param input all input parameters coming from the client - * @throws Exception if schema file is missing, cannot be found in - * classpath or fails to instantiate - */ - public WritableResolver(InputData input) throws Exception { - super(input); - - String schemaName = inputData.getUserProperty("DATA-SCHEMA"); - - /** Testing that the schema name was supplied by the user - schema is an optional property. */ - if (schemaName == null) { - throw new DataSchemaException(SCHEMA_NOT_INDICATED, this.getClass().getName()); - } - - /** Testing that the schema resource exists. */ - if (!isSchemaOnClasspath(schemaName)) { - throw new DataSchemaException(SCHEMA_NOT_ON_CLASSPATH, schemaName); - } - - userObject = Utilities.createAnyInstance(schemaName); - fields = userObject.getClass().getDeclaredFields(); - recordkeyIndex = (inputData.getRecordkeyColumn() == null) - ? RECORDKEY_UNDEFINED - : inputData.getRecordkeyColumn().columnIndex(); - - // fields details: - if (LOG.isDebugEnabled()) { - for (int i = 0; i < fields.length; i++) { - Field field = fields[i]; - String javaType = field.getType().getName(); - boolean isPrivate = Modifier.isPrivate(field.getModifiers()); - - LOG.debug("Field #" + i + ", name: " + field.getName() + - " type: " + javaType + ", " + - (isArray(javaType) ? "Array" : "Primitive") + ", " + - (isPrivate ? "Private" : "accessible") + " field"); - } - } - } - - private boolean isArray(String javaType) { - return (javaType.startsWith("[") && !"[B".equals(javaType)); - } - - @Override - public List getFields(OneRow onerow) throws Exception { - userObject = onerow.getData(); - List record = new LinkedList(); - - int currentIdx = 0; - for (Field field : fields) { - if (currentIdx == recordkeyIndex) { - currentIdx += recordkeyAdapter.appendRecordkeyField(record, inputData, onerow); - } - - if (Modifier.isPrivate(field.getModifiers())) { - continue; - } - - currentIdx += populateRecord(record, field); - } - - return record; - } - - int setArrayField(List record, int dataType, Field reflectedField) throws IllegalAccessException { - Object array = reflectedField.get(userObject); - int length = Array.getLength(array); - for (int j = 0; j < length; j++) { - record.add(new OneField(dataType, Array.get(array, j))); - } - return length; - } - - /* - * Given a java Object type, convert it to the corresponding output field - * type. - */ - private DataType convertJavaToGPDBType(String type) { - if ("boolean".equals(type) || "[Z".equals(type)) { - return BOOLEAN; - } - if ("int".equals(type) || "[I".equals(type)) { - return INTEGER; - } - if ("double".equals(type) || "[D".equals(type)) { - return FLOAT8; - } - if ("java.lang.String".equals(type) || "[Ljava.lang.String;".equals(type)) { - return TEXT; - } - if ("float".equals(type) || "[F".equals(type)) { - return REAL; - } - if ("long".equals(type) || "[J".equals(type)) { - return BIGINT; - } - if ("[B".equals(type)) { - return BYTEA; - } - if ("short".equals(type) || "[S".equals(type)) { - return SMALLINT; - } - throw new UnsupportedTypeException("Type " + type + " is not supported by GPDBWritable"); - } - - int populateRecord(List record, Field field) throws BadRecordException { - String javaType = field.getType().getName(); - try { - DataType dataType = convertJavaToGPDBType(javaType); - if (isArray(javaType)) { - return setArrayField(record, dataType.getOID(), field); - } - record.add(new OneField(dataType.getOID(), field.get(userObject))); - return 1; - } catch (IllegalAccessException ex) { - throw new BadRecordException(ex); - } - } - - /** - * Sets customWritable fields and creates a OneRow object. - */ - @Override - public OneRow setFields(List record) throws Exception { - Writable key = null; - - int colIdx = 0; - for (Field field : fields) { - /* - * extract recordkey based on the column descriptor type - * and add to OneRow.key - */ - if (colIdx == recordkeyIndex) { - key = recordkeyAdapter.convertKeyValue(record.get(colIdx).val); - colIdx++; - } - - if (Modifier.isPrivate(field.getModifiers())) { - continue; - } - - String javaType = field.getType().getName(); - convertJavaToGPDBType(javaType); - if (isArray(javaType)) { - Object value = field.get(userObject); - int length = Array.getLength(value); - for (int j = 0; j < length; j++, colIdx++) { - Array.set(value, j, record.get(colIdx).val); - } - } else { - field.set(userObject, record.get(colIdx).val); - colIdx++; - } - } - - return new OneRow(key, userObject); - } - - /* - * Tests for the case schema resource is a file like avro_schema.avsc - * or for the case schema resource is a Java class. in which case we try to reflect the class name. - */ - private boolean isSchemaOnClasspath(String resource) { - if (this.getClass().getClassLoader().getResource("/" + resource) != null) { - return true; - } - - try { - Class.forName(resource); - return true; - } catch (ClassNotFoundException e) { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java deleted file mode 100644 index db0c5ea..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/DataSchemaException.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs.utilities; - -/** - * Thrown when there is a data schema problem detected by any plugin that - * requires a schema. - * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_ON_CLASSPATH} when the specified schema is missing from the CLASSPATH. - * {@link DataSchemaException.MessageFmt#SCHEMA_NOT_INDICATED} when a schema was required but was not specified in the pxf uri. - */ -public class DataSchemaException extends RuntimeException { - public static enum MessageFmt { - SCHEMA_NOT_INDICATED("%s requires a data schema to be specified in the "+ - "pxf uri, but none was found. Please supply it" + - "using the DATA-SCHEMA option "), - SCHEMA_NOT_ON_CLASSPATH("schema resource \"%s\" is not located on the classpath"); - - String format; - - MessageFmt(String format) { - this.format = format; - } - - public String getFormat() { - return format; - } - } - - private MessageFmt msgFormat; - - /** - * Constructs a DataSchemaException. - * - * @param msgFormat the message format - * @param msgArgs the message arguments - */ - public DataSchemaException(MessageFmt msgFormat, String... msgArgs) { - super(String.format(msgFormat.getFormat(), (Object[]) msgArgs)); - this.msgFormat = msgFormat; - } - - public MessageFmt getMsgFormat() { - return msgFormat; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java deleted file mode 100644 index c7fe103..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/HdfsUtilities.java +++ /dev/null @@ -1,231 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs.utilities; - -import com.pivotal.pxf.service.utilities.Utilities; -import com.pivotal.pxf.api.io.DataType; -import com.pivotal.pxf.api.OneField; -import com.pivotal.pxf.api.utilities.InputData; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.mapred.FsInput; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.SplittableCompressionCodec; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.util.ReflectionUtils; - -import java.io.*; -import java.util.List; - -/** - * HdfsUtilities class exposes helper methods for PXF classes. - */ -public class HdfsUtilities { - private static Log Log = LogFactory.getLog(HdfsUtilities.class); - private static Configuration config = new Configuration(); - private static CompressionCodecFactory factory = new CompressionCodecFactory( - config); - - /** - * Hdfs data sources are absolute data paths. Method ensures that dataSource - * begins with '/'. - * - * @param dataSource The HDFS path to a file or directory of interest. - * Retrieved from the client request. - * @return an absolute data path - */ - public static String absoluteDataPath(String dataSource) { - return (dataSource.charAt(0) == '/') ? dataSource : "/" + dataSource; - } - - /* - * Helper routine to get a compression codec class - */ - private static Class getCodecClass(Configuration conf, - String name) { - - Class codecClass; - try { - codecClass = conf.getClassByName(name).asSubclass( - CompressionCodec.class); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("Compression codec " + name - + " was not found.", e); - } - return codecClass; - } - - /** - * Helper routine to get compression codec through reflection. - * - * @param conf configuration used for reflection - * @param name codec name - * @return generated CompressionCodec - */ - public static CompressionCodec getCodec(Configuration conf, String name) { - return ReflectionUtils.newInstance(getCodecClass(conf, name), conf); - } - - /** - * Helper routine to get compression codec class by path (file suffix). - * - * @param path path of file to get codec for - * @return matching codec class for the path. null if no codec is needed. - */ - private static Class getCodecClassByPath(String path) { - - Class codecClass = null; - CompressionCodec codec = factory.getCodec(new Path(path)); - if (codec != null) { - codecClass = codec.getClass(); - } - Log.debug((codecClass == null ? "No codec" : "Codec " + codecClass) - + " was found for file " + path); - return codecClass; - } - - /** - * Returns true if the needed codec is splittable. If no codec is needed - * returns true as well. - * - * @param path path of the file to be read - * @return if the codec needed for reading the specified path is splittable. - */ - public static boolean isSplittableCodec(Path path) { - - final CompressionCodec codec = factory.getCodec(path); - if (null == codec) { - return true; - } - - return codec instanceof SplittableCompressionCodec; - } - - /** - * Checks if requests should be handle in a single thread or not. - * - * @param dataDir hdfs path to the data source - * @param compCodec the fully qualified name of the compression codec - * @return if the request can be run in multi-threaded mode. - */ - public static boolean isThreadSafe(String dataDir, String compCodec) { - - Class codecClass = (compCodec != null) ? HdfsUtilities.getCodecClass( - config, compCodec) : HdfsUtilities.getCodecClassByPath(dataDir); - /* bzip2 codec is not thread safe */ - return (codecClass == null || !BZip2Codec.class.isAssignableFrom(codecClass)); - } - - /** - * Prepares byte serialization of a file split information (start, length, - * hosts) using {@link ObjectOutputStream}. - * - * @param fsp file split to be serialized - * @return byte serialization of fsp - * @throws IOException if I/O errors occur while writing to the underlying - * stream - */ - public static byte[] prepareFragmentMetadata(FileSplit fsp) - throws IOException { - ByteArrayOutputStream byteArrayStream = new ByteArrayOutputStream(); - ObjectOutputStream objectStream = new ObjectOutputStream( - byteArrayStream); - objectStream.writeLong(fsp.getStart()); - objectStream.writeLong(fsp.getLength()); - objectStream.writeObject(fsp.getLocations()); - - return byteArrayStream.toByteArray(); - } - - /** - * Parses fragment metadata and return matching {@link FileSplit}. - * - * @param inputData request input data - * @return FileSplit with fragment metadata - */ - public static FileSplit parseFragmentMetadata(InputData inputData) { - try { - byte[] serializedLocation = inputData.getFragmentMetadata(); - if (serializedLocation == null) { - throw new IllegalArgumentException( - "Missing fragment location information"); - } - - ByteArrayInputStream bytesStream = new ByteArrayInputStream( - serializedLocation); - ObjectInputStream objectStream = new ObjectInputStream(bytesStream); - - long start = objectStream.readLong(); - long end = objectStream.readLong(); - - String[] hosts = (String[]) objectStream.readObject(); - - FileSplit fileSplit = new FileSplit(new Path( - inputData.getDataSource()), start, end, hosts); - - Log.debug("parsed file split: path " + inputData.getDataSource() - + ", start " + start + ", end " + end + ", hosts " - + ArrayUtils.toString(hosts)); - - return fileSplit; - - } catch (Exception e) { - throw new RuntimeException( - "Exception while reading expected fragment metadata", e); - } - } - - /** - * Accessing the Avro file through the "unsplittable" API just to get the - * schema. The splittable API (AvroInputFormat) which is the one we will be - * using to fetch the records, does not support getting the Avro schema yet. - * - * @param conf Hadoop configuration - * @param dataSource Avro file (i.e fileName.avro) path - * @return the Avro schema - * @throws IOException if I/O error occured while accessing Avro schema file - */ - public static Schema getAvroSchema(Configuration conf, String dataSource) - throws IOException { - FsInput inStream = new FsInput(new Path(dataSource), conf); - DatumReader dummyReader = new GenericDatumReader<>(); - DataFileReader dummyFileReader = new DataFileReader<>( - inStream, dummyReader); - Schema schema = dummyFileReader.getSchema(); - dummyFileReader.close(); - return schema; - } - - /** - * Returns string serialization of list of fields. Fields of binary type - * (BYTEA) are converted to octal representation to make sure they will be - * relayed properly to the DB. - * - * @param complexRecord list of fields to be stringified - * @param delimiter delimiter between fields - * @return string of serialized fields using delimiter - */ - public static String toString(List complexRecord, String delimiter) { - StringBuilder buff = new StringBuilder(); - String delim = ""; // first iteration has no delimiter - for (OneField complex : complexRecord) { - if (complex.type == DataType.BYTEA.getOID()) { - /** Serialize byte array as string */ - buff.append(delim); - Utilities.byteArrayToOctalString((byte[]) complex.val, buff); - } else { - buff.append(delim).append(complex.val); - } - delim = delimiter; - } - return buff.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java deleted file mode 100644 index 958495c..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/PxfInputFormat.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs.utilities; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.*; - -import java.io.IOException; - -/** - * PxfInputFormat is not intended to read a specific format, hence it implements - * a dummy getRecordReader Instead, its purpose is to apply - * FileInputFormat.getSplits from one point in PXF and get the splits which are - * valid for the actual InputFormats, since all of them we use inherit - * FileInputFormat but do not override getSplits. - */ -public class PxfInputFormat extends FileInputFormat { - - @Override - public RecordReader getRecordReader(InputSplit split, - JobConf conf, - Reporter reporter) throws IOException { - throw new UnsupportedOperationException("PxfInputFormat should not be used for reading data, but only for obtaining the splits of a file"); - } - - /* - * Return true if this file can be split. - */ - @Override - protected boolean isSplitable(FileSystem fs, Path filename) { - return HdfsUtilities.isSplittableCodec(filename); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java b/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java deleted file mode 100644 index a88ade0..0000000 --- a/pxf/pxf-hdfs/src/main/java/com/pivotal/pxf/plugins/hdfs/utilities/RecordkeyAdapter.java +++ /dev/null @@ -1,265 +0,0 @@ -package com.pivotal.pxf.plugins.hdfs.utilities; - -import com.pivotal.pxf.api.OneField; -import com.pivotal.pxf.api.OneRow; -import com.pivotal.pxf.api.utilities.ColumnDescriptor; -import com.pivotal.pxf.api.utilities.InputData; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.*; - -import java.util.List; - -/** - * Adapter used for adding a recordkey field to the records output {@code List}. - */ -public class RecordkeyAdapter { - private Log Log; - - /* - * We need to transform Record keys to java primitive types. - * Since the type of the key is the same throughout the file we do the type resolution - * in the first call (for the first record) and then use a "Java variation on Function pointer" - * to do the extraction for the rest of the records. - */ - private interface ValExtractor { - public Object get(Object key); - } - - private ValExtractor extractor = null; - - private interface ValConverter { - public Writable get(Object key); - } - - private ValConverter converter = null; - - /** - * Constructs a RecordkeyAdapter. - */ - public RecordkeyAdapter() { - Log = LogFactory.getLog(RecordkeyAdapter.class); - } - - /** - * Adds the recordkey to the end of the passed in recFields list. - *

- * This method also verifies cases in which record keys are not supported - * by the underlying source type, and therefore "illegally" requested. - * - * @param recFields existing list of record (non-key) fields and their values. - * @param input all input parameters coming from the client request - * @param onerow a row object which is used here in order to find out if - * the given type supports recordkeys or not. - * @return 0 if record key not needed, or 1 if record key was appended - * @throws NoSuchFieldException when the given record type does not support - * recordkeys - */ - public int appendRecordkeyField(List recFields, - InputData input, - OneRow onerow) throws NoSuchFieldException { - - /* - * user did not request the recordkey field in the - * "create external table" statement - */ - ColumnDescriptor recordkeyColumn = input.getRecordkeyColumn(); - if (recordkeyColumn == null) { - return 0; - } - - /* - * The recordkey was filled in the fileAccessor during execution of - * method readNextObject. The current accessor implementations are - * SequenceFileAccessor, LineBreakAccessor and AvroFileAccessor from - * HdfsSplittableDataAccessor and QuotedLineBreakAccessor from - * HdfsAtomicDataAccessor. For SequenceFileAccessor, LineBreakAccessor - * the recordkey is set, since it is returned by the - * SequenceFileRecordReader or LineRecordReader(for text file). But Avro - * files do not have keys, so the AvroRecordReader will not return a key - * and in this case recordkey will be null. If the user specified a - * recordkey attribute in the CREATE EXTERNAL TABLE statement and he - * reads from an AvroFile, we will throw an exception since the Avro - * file does not have keys In the future, additional implementations of - * FileAccessors will have to set recordkey during readNextObject(). - * Otherwise it is null by default and we will throw an exception here, - * that is if we get here... a careful user will not specify recordkey - * in the CREATE EXTERNAL statement and then we will leave this function - * one line above. - */ - Object recordkey = onerow.getKey(); - if (recordkey == null) { - throw new NoSuchFieldException("Value for field \"recordkey\" was requested but the queried HDFS resource type does not support key"); - } - - OneField oneField = new OneField(); - oneField.type = recordkeyColumn.columnTypeCode(); - oneField.val = extractVal(recordkey); - recFields.add(oneField); - return 1; - } - - /* - * Extracts a java primitive type value from the recordkey. If the key is a - * Writable implementation we extract the value as a Java primitive. If the - * key is already a Java primitive we returned it as is If it is an unknown - * type we throw an exception - */ - private Object extractVal(Object key) { - if (extractor == null) { - extractor = InitializeExtractor(key); - } - - return extractor.get(key); - } - - /* - * Initialize the extractor object based on the type of the recordkey - */ - private ValExtractor InitializeExtractor(Object key) { - if (key instanceof IntWritable) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return ((IntWritable) key).get(); - } - }; - } else if (key instanceof ByteWritable) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return ((ByteWritable) key).get(); - } - }; - } else if (key instanceof BooleanWritable) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return ((BooleanWritable) key).get(); - } - }; - } else if (key instanceof DoubleWritable) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return ((DoubleWritable) key).get(); - } - }; - } else if (key instanceof FloatWritable) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return ((FloatWritable) key).get(); - } - }; - } else if (key instanceof LongWritable) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return ((LongWritable) key).get(); - } - }; - } else if (key instanceof Text) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return (key).toString(); - } - }; - } else if (key instanceof VIntWritable) { - return new ValExtractor() { - @Override - public Object get(Object key) { - return ((VIntWritable) key).get(); - } - }; - } else { - return new ValExtractor() { - @Override - public Object get(Object key) { - throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName()); - } - }; - } - } - - /** - * Converts given key object to its matching Writable. - * Supported types: Integer, Byte, Boolean, Double, Float, Long, String. - * The type is only checked once based on the key, all consequent calls - * must be of the same type. - * - * @param key object to convert - * @return Writable object matching given key - */ - public Writable convertKeyValue(Object key) { - if (converter == null) { - converter = initializeConverter(key); - Log.debug("converter initialized for type " + key.getClass() + - " (key value: " + key + ")"); - } - - return converter.get(key); - } - - private ValConverter initializeConverter(Object key) { - - if (key instanceof Integer) { - return new ValConverter() { - @Override - public Writable get(Object key) { - return (new IntWritable((Integer) key)); - } - }; - } else if (key instanceof Byte) { - return new ValConverter() { - @Override - public Writable get(Object key) { - return (new ByteWritable((Byte) key)); - } - }; - } else if (key instanceof Boolean) { - return new ValConverter() { - @Override - public Writable get(Object key) { - return (new BooleanWritable((Boolean) key)); - } - }; - } else if (key instanceof Double) { - return new ValConverter() { - @Override - public Writable get(Object key) { - return (new DoubleWritable((Double) key)); - } - }; - } else if (key instanceof Float) { - return new ValConverter() { - @Override - public Writable get(Object key) { - return (new FloatWritable((Float) key)); - } - }; - } else if (key instanceof Long) { - return new ValConverter() { - @Override - public Writable get(Object key) { - return (new LongWritable((Long) key)); - } - }; - } else if (key instanceof String) { - return new ValConverter() { - @Override - public Writable get(Object key) { - return (new Text((String) key)); - } - }; - } else { - return new ValConverter() { - @Override - public Writable get(Object key) { - throw new UnsupportedOperationException("Unsupported recordkey data type " + key.getClass().getName()); - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f053e053/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java new file mode 100644 index 0000000..fab51ca --- /dev/null +++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/AvroFileAccessor.java @@ -0,0 +1,77 @@ +package org.apache.hawq.pxf.plugins.hdfs; + +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.utilities.InputData; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.*; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; + +import static org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities.getAvroSchema; + +/** + * A PXF Accessor for reading Avro File records + */ +public class AvroFileAccessor extends HdfsSplittableDataAccessor { + private AvroWrapper avroWrapper = null; + + /** + * Constructs a AvroFileAccessor that creates the job configuration and + * accesses the avro file to fetch the avro schema + * + * @param input all input parameters coming from the client + * @throws Exception if getting the avro schema fails + */ + public AvroFileAccessor(InputData input) throws Exception { + // 1. Call the base class + super(input, new AvroInputFormat()); + + // 2. Accessing the avro file through the "unsplittable" API just to get the schema. + // The splittable API (AvroInputFormat) which is the one we will be using to fetch + // the records, does not support getting the avro schema yet. + Schema schema = getAvroSchema(conf, inputData.getDataSource()); + + // 3. Pass the schema to the AvroInputFormat + AvroJob.setInputSchema(jobConf, schema); + + // 4. The avroWrapper required for the iteration + avroWrapper = new AvroWrapper(); + } + + @Override + protected Object getReader(JobConf jobConf, InputSplit split) throws IOException { + return new AvroRecordReader(jobConf, (FileSplit) split); + } + + /** + * readNextObject + * The AVRO accessor is currently the only specialized accessor that + * overrides this method. This happens, because the special + * AvroRecordReader.next() semantics (use of the AvroWrapper), so it + * cannot use the RecordReader's default implementation in + * SplittableFileAccessor + */ + @Override + public OneRow readNextObject() throws IOException { + /** Resetting datum to null, to avoid stale bytes to be padded from the previous row's datum */ + avroWrapper.datum(null); + if (reader.next(avroWrapper, NullWritable.get())) { // There is one more record in the current split. + return new OneRow(null, avroWrapper.datum()); + } else if (getNextSplit()) { // The current split is exhausted. try to move to the next split. + return reader.next(avroWrapper, NullWritable.get()) + ? new OneRow(null, avroWrapper.datum()) + : null; + } + + // if neither condition was met, it means we already read all the records in all the splits, and + // in this call record variable was not set, so we return null and thus we are signaling end of + // records sequence - in this case avroWrapper.datum() will be null + return null; + } +}