incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [15/27] git commit: Adding compression switch on CsvBlurDriver.
Date Fri, 09 Aug 2013 17:24:20 GMT
Adding compression switch on CsvBlurDriver.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e7a43e8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e7a43e8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e7a43e8f

Branch: refs/heads/0.2.0-newtypesystem
Commit: e7a43e8f9250630e04e02fbdf4f49242ce580f94
Parents: e08f138
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Aug 8 08:37:30 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Aug 8 08:37:30 2013 -0400

----------------------------------------------------------------------
 .../blur/mapreduce/lib/CsvBlurDriver.java       | 53 +++++++++++++++++---
 .../blur/mapreduce/lib/CsvBlurDriverTest.java   | 30 ++++++++++-
 2 files changed, 75 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e7a43e8f/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
index 7ff8c94..aef7c3e 100644
--- a/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
+++ b/blur-mapred/src/main/java/org/apache/blur/mapreduce/lib/CsvBlurDriver.java
@@ -39,6 +39,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -53,15 +58,30 @@ import org.apache.hadoop.util.GenericOptionsParser;
 @SuppressWarnings("static-access")
 public class CsvBlurDriver {
 
+  public static final String MAPRED_COMPRESS_MAP_OUTPUT = "mapred.compress.map.output";
+  public static final String MAPRED_MAP_OUTPUT_COMPRESSION_CODEC = "mapred.map.output.compression.codec";
+
+  enum COMPRESSION {
+    SNAPPY(SnappyCodec.class), GZIP(GzipCodec.class), BZIP(BZip2Codec.class), DEFAULT(DefaultCodec.class);
+
+    private final String className;
+
+    private COMPRESSION(Class<? extends CompressionCodec> clazz) {
+      className = clazz.getName();
+    }
+
+    public String getClassName() {
+      return className;
+    }
+  }
+
   interface ControllerPool {
     Iface getClient(String controllerConnectionStr);
   }
 
   public static void main(String... args) throws Exception {
-
     Configuration configuration = new Configuration();
     String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
-
     Job job = setupJob(configuration, new ControllerPool() {
       @Override
       public Iface getClient(String controllerConnectionStr) {
@@ -93,6 +113,21 @@ public class CsvBlurDriver {
     job.setJarByClass(CsvBlurDriver.class);
     job.setMapperClass(CsvBlurMapper.class);
 
+    if (cmd.hasOption("p")) {
+      job.getConfiguration().set(MAPRED_COMPRESS_MAP_OUTPUT, "true");
+      String codecStr = cmd.getOptionValue("p");
+      COMPRESSION compression;
+      try {
+        compression = COMPRESSION.valueOf(codecStr.trim().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        compression = null;
+      }
+      if (compression == null) {
+        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, codecStr.trim());
+      } else {
+        job.getConfiguration().set(MAPRED_MAP_OUTPUT_COMPRESSION_CODEC, compression.getClassName());
+      }
+    }
     if (cmd.hasOption("a")) {
       CsvBlurMapper.setAutoGenerateRecordIdAsHashOfData(job, true);
     }
@@ -104,7 +139,7 @@ public class CsvBlurDriver {
     } else {
       job.setInputFormatClass(TextInputFormat.class);
     }
-    
+
     if (cmd.hasOption("C")) {
       if (cmd.hasOption("S")) {
         String[] optionValues = cmd.getOptionValues("C");
@@ -259,9 +294,15 @@ public class CsvBlurDriver {
         .withArgName("minimum maximum")
         .hasArgs(2)
         .withDescription(
-            "Enables a combine file input to help deal with many small files as the input.
Provide " +
-            "the minimum and maximum size per mapper.  For a minimum of 1GB and a maximum
of " +
-            "2.5GB: (1000000000 2500000000)").create("C"));
+            "Enables a combine file input to help deal with many small files as the input.
Provide "
+                + "the minimum and maximum size per mapper.  For a minimum of 1GB and a maximum
of "
+                + "2.5GB: (1000000000 2500000000)").create("C"));
+    options.addOption(OptionBuilder
+        .withArgName("codec")
+        .hasArgs(1)
+        .withDescription(
+            "Sets the compression codec for the map compress output setting. (SNAPPY,GZIP,BZIP,DEFAULT,
or classname)")
+        .create("p"));
 
     CommandLineParser parser = new PosixParser();
     CommandLine cmd = null;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e7a43e8f/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
----------------------------------------------------------------------
diff --git a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
index c294875..c3710f9 100644
--- a/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
+++ b/blur-mapred/src/test/java/org/apache/blur/mapreduce/lib/CsvBlurDriverTest.java
@@ -15,6 +15,7 @@ import org.apache.blur.mapreduce.lib.CsvBlurDriver.ControllerPool;
 import org.apache.blur.thrift.generated.Blur.Iface;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.Test;
 
@@ -55,7 +56,7 @@ public class CsvBlurDriverTest {
     Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
     assertEquals(2, familyAndColumnNameMap.size());
   }
-  
+
   @Test
   public void testCsvBlurDriverTest2() throws Exception {
     Configuration configurationSetup = new Configuration();
@@ -66,7 +67,30 @@ public class CsvBlurDriverTest {
       }
     };
     Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
-        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2", "-S", "-C", "1000000","2000000");
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2",
+        "-S", "-C", "1000000", "2000000");
+    assertNotNull(job);
+    Configuration configuration = job.getConfiguration();
+    TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
+    assertEquals(tableDescriptor.getName(), "table1");
+    Collection<String> inputs = configuration.getStringCollection("mapred.input.dir");
+    assertEquals(2, inputs.size());
+    Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
+    assertEquals(2, familyAndColumnNameMap.size());
+  }
+
+  @Test
+  public void testCsvBlurDriverTest3() throws Exception {
+    Configuration configurationSetup = new Configuration();
+    ControllerPool controllerPool = new CsvBlurDriver.ControllerPool() {
+      @Override
+      public Iface getClient(String controllerConnectionStr) {
+        return getMockIface();
+      }
+    };
+    Job job = CsvBlurDriver.setupJob(configurationSetup, controllerPool, "-c", "host:40010",
"-d", "family1", "col1",
+        "col2", "-d", "family2", "col3", "col4", "-t", "table1", "-i", "file:///tmp/test1",
"-i", "file:///tmp/test2",
+        "-S", "-C", "1000000", "2000000", "-p", "SNAPPY");
     assertNotNull(job);
     Configuration configuration = job.getConfiguration();
     TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(configuration);
@@ -75,6 +99,8 @@ public class CsvBlurDriverTest {
     assertEquals(2, inputs.size());
     Map<String, List<String>> familyAndColumnNameMap = CsvBlurMapper.getFamilyAndColumnNameMap(configuration);
     assertEquals(2, familyAndColumnNameMap.size());
+    assertEquals("true", configuration.get(CsvBlurDriver.MAPRED_COMPRESS_MAP_OUTPUT));
+    assertEquals(SnappyCodec.class.getName(), configuration.get(CsvBlurDriver.MAPRED_MAP_OUTPUT_COMPRESSION_CODEC));
   }
 
   protected Iface getMockIface() {


Mime
View raw message