Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA3BE200C1B for ; Tue, 31 Jan 2017 02:21:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E8CBE160B60; Tue, 31 Jan 2017 01:21:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E5C7F160B4D for ; Tue, 31 Jan 2017 02:21:30 +0100 (CET) Received: (qmail 35848 invoked by uid 500); 31 Jan 2017 01:21:30 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 35838 invoked by uid 99); 31 Jan 2017 01:21:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jan 2017 01:21:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D32A9DFC61; Tue, 31 Jan 2017 01:21:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <5cce944f7fd54aa9a3faba263c3f1cc6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: crunch git commit: CRUNCH-632: Added support for compressed CSVSource files. Date: Tue, 31 Jan 2017 01:21:29 +0000 (UTC) archived-at: Tue, 31 Jan 2017 01:21:32 -0000 Repository: crunch Updated Branches: refs/heads/master 2e4729404 -> 049fb499b CRUNCH-632: Added support for compressed CSVSource files. CRUNCH-632: Wrote simple test showing it now working on compressed CSV file. Signed-off-by: Josh Wills Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/049fb499 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/049fb499 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/049fb499 Branch: refs/heads/master Commit: 049fb499b7f2e45385f45d954138e56eecb83064 Parents: 2e47294 Author: Micah Whitacre Authored: Wed Jan 11 20:51:26 2017 -0600 Committer: Josh Wills Committed: Mon Jan 30 17:16:26 2017 -0800 ---------------------------------------------------------------------- .../crunch/io/text/csv/CSVFileSourceIT.java | 40 +++++++++++++++++++- .../io/text/csv/CSVFileReaderFactory.java | 20 +++++++++- .../crunch/io/text/csv/CSVInputFormat.java | 25 ++++++++++-- .../crunch/io/text/csv/CSVLineReader.java | 6 +-- .../crunch/io/text/csv/CSVRecordReader.java | 31 ++++++++++----- 5 files changed, 104 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java index b1c247f..4f6a5bf 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java @@ -19,9 +19,15 @@ package org.apache.crunch.io.text.csv; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Collection; +import org.apache.commons.compress.utils.IOUtils; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; @@ -29,12 +35,15 @@ import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.DeflateCodec; import org.junit.Rule; import org.junit.Test; public class CSVFileSourceIT { @Rule - public transient TemporaryPath tmpDir = TemporaryPaths.create(); + public TemporaryPath tmpDir = TemporaryPaths.create(); @Test public void testVanillaCSV() throws Exception { @@ -52,6 +61,35 @@ public class CSVFileSourceIT { } @Test + public void testVanillaCSV_Compressed() throws Exception { + final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" }; + + CompressionCodecFactory codecFactory = new CompressionCodecFactory(tmpDir.getDefaultConfiguration()); + CompressionCodec deflateCodec = codecFactory.getCodecByName(DeflateCodec.class.getName()); + + File compressedFile = tmpDir.getFile("vanilla." + deflateCodec.getDefaultExtension()); + + InputStream in = CSVFileSourceIT.class.getClassLoader().getResourceAsStream("vanilla.csv"); + OutputStream out = deflateCodec.createOutputStream(new FileOutputStream(compressedFile)); + try { + IOUtils.copy(in, out); + }finally { + in.close(); + out.close(); + } + + final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv"); + final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration()); + final PCollection csvLines = pipeline.read(new CSVFileSource(new Path(compressedFile.getPath()))); + + final Collection csvLinesList = csvLines.asCollection().getValue(); + + for (int i = 0; i < expectedFileContents.length; i++) { + assertTrue(csvLinesList.contains(expectedFileContents[i])); + } + } + + @Test public void testVanillaCSVWithAdditionalActions() throws Exception { final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" }; http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java index be7bdcc..4418166 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java @@ -18,14 +18,19 @@ package org.apache.crunch.io.text.csv; import java.io.IOException; +import java.io.InputStream; import java.util.Iterator; import org.apache.crunch.io.FileReaderFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.google.common.collect.Iterators; +import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +47,8 @@ public class CSVFileReaderFactory implements FileReaderFactory { private final char escapeChar; private final int maximumRecordSize; + private CompressionCodecFactory compressionCodecFactory; + /** * Creates a new {@code CSVFileReaderFactory} instance with default * configuration @@ -88,9 +95,10 @@ public class CSVFileReaderFactory implements FileReaderFactory { @Override public Iterator read(final FileSystem fs, final Path path) { - FSDataInputStream is; + InputStream is; try { - is = fs.open(path); + CompressionCodec codec = getCompressionCodec(path, fs.getConf()); + is = codec == null ? fs.open(path): codec.createInputStream(fs.open(path)); return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar, maximumRecordSize); } catch (final IOException e) { @@ -98,4 +106,12 @@ public class CSVFileReaderFactory implements FileReaderFactory { return Iterators.emptyIterator(); } } + + private CompressionCodec getCompressionCodec(Path path, Configuration configuration){ + if(compressionCodecFactory == null){ + compressionCodecFactory = new CompressionCodecFactory(configuration); + } + + return compressionCodecFactory.getCodec(path); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java index 2dc5e13..95cbeab 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; @@ -93,11 +95,22 @@ public class CSVInputFormat extends FileInputFormat implemen final List splits = new ArrayList(); final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0])); FSDataInputStream inputStream = null; + + Configuration config = job.getConfiguration(); + CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(config); + try { for (final Path path : paths) { - FileSystem fileSystem = path.getFileSystem(job.getConfiguration()); - inputStream = fileSystem.open(path); - splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream)); + FileSystem fileSystem = path.getFileSystem(config); + CompressionCodec codec = compressionCodecFactory.getCodec(path); + if(codec == null) { + //if file is not compressed then split it up. + inputStream = fileSystem.open(path); + splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream)); + }else{ + //compressed file so no splitting it + splits.add(new FileSplit(path,0, Long.MAX_VALUE, new String[0])); + } } return splits; } finally { @@ -199,4 +212,10 @@ public class CSVInputFormat extends FileInputFormat implemen return splitsList; } + + @Override + protected boolean isSplitable(JobContext context, Path file) { + CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); + return codec == null; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java index 79af67d..c46e443 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java @@ -125,7 +125,7 @@ public class CSVLineReader { * Used to specify a custom open quote character * @param closeQuoteChar * Used to specify a custom close quote character - * @param escape + * @param escapeChar * Used to specify a custom escape character * @param maximumRecordSize * The maximum acceptable size of one CSV record. Beyond this limit, @@ -205,8 +205,8 @@ public class CSVLineReader { // wrong. if (totalBytesConsumed > maximumRecordSize || totalBytesConsumed > Integer.MAX_VALUE) { final String record = stringBuilder.toString(); - LOGGER.error("Possibly malformed file encountered. First line of record: " - + record.substring(0, record.indexOf('\n'))); + LOGGER.error("Possibly malformed file encountered. First line of record: {}", + record.substring(0, record.indexOf('\n'))); throw new IOException("Possibly malformed file encountered. Check log statements for more information"); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java index 192a018..54fc905 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java @@ -18,12 +18,15 @@ package org.apache.crunch.io.text.csv; import java.io.IOException; +import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -44,7 +47,7 @@ public class CSVRecordReader extends RecordReader { private LongWritable key = null; private Text value = null; - private FSDataInputStream fileIn; + private InputStream fileIn; private CSVLineReader csvLineReader; private final char openQuote; private final char closeQuote; @@ -117,15 +120,25 @@ public class CSVRecordReader extends RecordReader { this.pos = start; final Path file = split.getPath(); - LOGGER.info("Initializing processing of split for file: " + file); - LOGGER.info("File size is: " + file.getFileSystem(job).getFileStatus(file).getLen()); - LOGGER.info("Split starts at: " + start); - LOGGER.info("Split will end at: " + end); + + CompressionCodecFactory codecFactory = new CompressionCodecFactory(context.getConfiguration()); + CompressionCodec compressionCodec = codecFactory.getCodec(file); + + LOGGER.info("Initializing processing of split for file: {}", file); + LOGGER.info("File size is: {}", file.getFileSystem(job).getFileStatus(file).getLen()); + LOGGER.info("Split starts at: {}", start); + LOGGER.info("Split will end at: {}", end); + LOGGER.info("File is compressed: {}", (compressionCodec != null)); // Open the file, seek to the start of the split // then wrap it in a CSVLineReader - fileIn = file.getFileSystem(job).open(file); - fileIn.seek(start); + if(compressionCodec == null) { + FSDataInputStream in = file.getFileSystem(job).open(file); + in.seek(start); + fileIn = in; + }else{ + fileIn = compressionCodec.createInputStream(file.getFileSystem(job).open(file)); + } csvLineReader = new CSVLineReader(fileIn, this.fileStreamBufferSize, inputFileEncoding, this.openQuote, this.closeQuote, this.escape, this.maximumRecordSize); } @@ -150,7 +163,7 @@ public class CSVRecordReader extends RecordReader { if (pos >= end) { key = null; value = null; - LOGGER.info("End of split reached, ending processing. Total records read for this split: " + totalRecordsRead); + LOGGER.info("End of split reached, ending processing. Total records read for this split: {}", totalRecordsRead); close(); return false; } @@ -158,7 +171,7 @@ public class CSVRecordReader extends RecordReader { final int newSize = csvLineReader.readCSVLine(value); if (newSize == 0) { - LOGGER.info("End of file reached. Ending processing. Total records read for this split: " + totalRecordsRead); + LOGGER.info("End of file reached. Ending processing. Total records read for this split: {}", totalRecordsRead); return false; }