crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-632: Added support for compressed CSVSource files.
Date Tue, 31 Jan 2017 01:21:29 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 2e4729404 -> 049fb499b


CRUNCH-632: Added support for compressed CSVSource files.

CRUNCH-632: Wrote simple test showing it now working on compressed CSV file.

Signed-off-by: Josh Wills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/049fb499
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/049fb499
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/049fb499

Branch: refs/heads/master
Commit: 049fb499b7f2e45385f45d954138e56eecb83064
Parents: 2e47294
Author: Micah Whitacre <mkwhit@gmail.com>
Authored: Wed Jan 11 20:51:26 2017 -0600
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jan 30 17:16:26 2017 -0800

----------------------------------------------------------------------
 .../crunch/io/text/csv/CSVFileSourceIT.java     | 40 +++++++++++++++++++-
 .../io/text/csv/CSVFileReaderFactory.java       | 20 +++++++++-
 .../crunch/io/text/csv/CSVInputFormat.java      | 25 ++++++++++--
 .../crunch/io/text/csv/CSVLineReader.java       |  6 +--
 .../crunch/io/text/csv/CSVRecordReader.java     | 31 ++++++++++-----
 5 files changed, 104 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
index b1c247f..4f6a5bf 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
@@ -19,9 +19,15 @@ package org.apache.crunch.io.text.csv;
 
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Collection;
 
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pipeline;
@@ -29,12 +35,15 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DeflateCodec;
 import org.junit.Rule;
 import org.junit.Test;
 
 public class CSVFileSourceIT {
   @Rule
-  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+  public TemporaryPath tmpDir = TemporaryPaths.create();
 
   @Test
   public void testVanillaCSV() throws Exception {
@@ -52,6 +61,35 @@ public class CSVFileSourceIT {
   }
 
   @Test
+  public void testVanillaCSV_Compressed() throws Exception {
+    final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+
+    CompressionCodecFactory codecFactory = new CompressionCodecFactory(tmpDir.getDefaultConfiguration());
+    CompressionCodec deflateCodec = codecFactory.getCodecByName(DeflateCodec.class.getName());
+
+    File compressedFile = tmpDir.getFile("vanilla." + deflateCodec.getDefaultExtension());
+
+    InputStream in = CSVFileSourceIT.class.getClassLoader().getResourceAsStream("vanilla.csv");
+    OutputStream out = deflateCodec.createOutputStream(new FileOutputStream(compressedFile));
+    try {
+      IOUtils.copy(in, out);
+    }finally {
+      in.close();
+      out.close();
+    }
+
+    final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+    final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(compressedFile.getPath())));
+
+    final Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+    for (int i = 0; i < expectedFileContents.length; i++) {
+      assertTrue(csvLinesList.contains(expectedFileContents[i]));
+    }
+  }
+
+  @Test
   public void testVanillaCSVWithAdditionalActions() throws Exception {
     final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
index be7bdcc..4418166 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
@@ -18,14 +18,19 @@
 package org.apache.crunch.io.text.csv;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Iterator;
 
 import org.apache.crunch.io.FileReaderFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Iterators;
+import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +47,8 @@ public class CSVFileReaderFactory implements FileReaderFactory<String>
{
   private final char escapeChar;
   private final int maximumRecordSize;
 
+  private CompressionCodecFactory compressionCodecFactory;
+
   /**
    * Creates a new {@code CSVFileReaderFactory} instance with default
    * configuration
@@ -88,9 +95,10 @@ public class CSVFileReaderFactory implements FileReaderFactory<String>
{
 
   @Override
   public Iterator<String> read(final FileSystem fs, final Path path) {
-    FSDataInputStream is;
+    InputStream is;
     try {
-      is = fs.open(path);
+      CompressionCodec codec = getCompressionCodec(path, fs.getConf());
+      is = codec == null ? fs.open(path): codec.createInputStream(fs.open(path));
       return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
escapeChar,
           maximumRecordSize);
     } catch (final IOException e) {
@@ -98,4 +106,12 @@ public class CSVFileReaderFactory implements FileReaderFactory<String>
{
       return Iterators.emptyIterator();
     }
   }
+
+  private CompressionCodec getCompressionCodec(Path path, Configuration configuration){
+    if(compressionCodecFactory == null){
+      compressionCodecFactory = new CompressionCodecFactory(configuration);
+    }
+
+    return compressionCodecFactory.getCodec(path);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
index 2dc5e13..95cbeab 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -93,11 +95,22 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text>
implemen
     final List<InputSplit> splits = new ArrayList<InputSplit>();
     final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0]));
     FSDataInputStream inputStream = null;
+
+    Configuration config = job.getConfiguration();
+    CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(config);
+
     try {
       for (final Path path : paths) {
-        FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
-        inputStream = fileSystem.open(path);
-        splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(),
path, inputStream));
+        FileSystem fileSystem = path.getFileSystem(config);
+        CompressionCodec codec = compressionCodecFactory.getCodec(path);
+        if(codec == null) {
+          //if file is not compressed then split it up.
+          inputStream = fileSystem.open(path);
+          splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(),
path, inputStream));
+        }else{
+          //compressed file so no splitting it
+          splits.add(new FileSplit(path,0, Long.MAX_VALUE, new String[0]));
+        }
       }
       return splits;
     } finally {
@@ -199,4 +212,10 @@ public class CSVInputFormat extends FileInputFormat<LongWritable,
Text> implemen
 
     return splitsList;
   }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return codec == null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
index 79af67d..c46e443 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
@@ -125,7 +125,7 @@ public class CSVLineReader {
    *          Used to specify a custom open quote character
    * @param closeQuoteChar
    *          Used to specify a custom close quote character
-   * @param escape
+   * @param escapeChar
    *          Used to specify a custom escape character
    * @param maximumRecordSize
    *          The maximum acceptable size of one CSV record. Beyond this limit,
@@ -205,8 +205,8 @@ public class CSVLineReader {
         // wrong.
         if (totalBytesConsumed > maximumRecordSize || totalBytesConsumed > Integer.MAX_VALUE)
{
           final String record = stringBuilder.toString();
-          LOGGER.error("Possibly malformed file encountered. First line of record: "
-              + record.substring(0, record.indexOf('\n')));
+          LOGGER.error("Possibly malformed file encountered. First line of record: {}",
+               record.substring(0, record.indexOf('\n')));
           throw new IOException("Possibly malformed file encountered. Check log statements
for more information");
         }
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/049fb499/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
index 192a018..54fc905 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
@@ -18,12 +18,15 @@
 package org.apache.crunch.io.text.csv;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -44,7 +47,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
   private LongWritable key = null;
   private Text value = null;
 
-  private FSDataInputStream fileIn;
+  private InputStream fileIn;
   private CSVLineReader csvLineReader;
   private final char openQuote;
   private final char closeQuote;
@@ -117,15 +120,25 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
     this.pos = start;
 
     final Path file = split.getPath();
-    LOGGER.info("Initializing processing of split for file: " + file);
-    LOGGER.info("File size is: " + file.getFileSystem(job).getFileStatus(file).getLen());
-    LOGGER.info("Split starts at: " + start);
-    LOGGER.info("Split will end at: " + end);
+
+    CompressionCodecFactory codecFactory = new CompressionCodecFactory(context.getConfiguration());
+    CompressionCodec compressionCodec = codecFactory.getCodec(file);
+
+    LOGGER.info("Initializing processing of split for file: {}", file);
+    LOGGER.info("File size is: {}", file.getFileSystem(job).getFileStatus(file).getLen());
+    LOGGER.info("Split starts at: {}", start);
+    LOGGER.info("Split will end at: {}", end);
+    LOGGER.info("File is compressed: {}", (compressionCodec != null));
 
     // Open the file, seek to the start of the split
     // then wrap it in a CSVLineReader
-    fileIn = file.getFileSystem(job).open(file);
-    fileIn.seek(start);
+    if(compressionCodec == null) {
+      FSDataInputStream in = file.getFileSystem(job).open(file);
+      in.seek(start);
+      fileIn = in;
+    }else{
+      fileIn = compressionCodec.createInputStream(file.getFileSystem(job).open(file));
+    }
     csvLineReader = new CSVLineReader(fileIn, this.fileStreamBufferSize, inputFileEncoding,
this.openQuote,
         this.closeQuote, this.escape, this.maximumRecordSize);
   }
@@ -150,7 +163,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
     if (pos >= end) {
       key = null;
       value = null;
-      LOGGER.info("End of split reached, ending processing. Total records read for this split:
" + totalRecordsRead);
+      LOGGER.info("End of split reached, ending processing. Total records read for this split:
{}", totalRecordsRead);
       close();
       return false;
     }
@@ -158,7 +171,7 @@ public class CSVRecordReader extends RecordReader<LongWritable, Text>
{
     final int newSize = csvLineReader.readCSVLine(value);
 
     if (newSize == 0) {
-      LOGGER.info("End of file reached. Ending processing. Total records read for this split:
" + totalRecordsRead);
+      LOGGER.info("End of file reached. Ending processing. Total records read for this split:
{}", totalRecordsRead);
       return false;
     }
 


Mime
View raw message