pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1145447 - in /pig/trunk: CHANGES.txt contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
Date Tue, 12 Jul 2011 04:09:35 GMT
Author: daijy
Date: Tue Jul 12 04:09:34 2011
New Revision: 1145447

URL: http://svn.apache.org/viewvc?rev=1145447&view=rev
Log:
PIG-2130: Piggybank:MultiStorage is not compressing output files

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1145447&r1=1145446&r2=1145447&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jul 12 04:09:34 2011
@@ -66,6 +66,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
 
 BUG FIXES
 
+PIG-2130: Piggybank:MultiStorage is not compressing output files (vivekp via daijy)
+
 PIG-2147: Support nested tags for XMLLoader (vivekp via daijy)
 
 PIG-1890: Fix piggybank unit test TestAvroStorage (kengoodhope via daijy)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=1145447&r1=1145446&r2=1145447&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
(original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
Tue Jul 12 04:09:34 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -35,8 +36,7 @@ import org.apache.hadoop.mapreduce.TaskI
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.ResourceSchema;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
@@ -68,6 +68,10 @@ import org.apache.pig.impl.util.StorageU
  * then its imperative that all tuples for a particular group will go exactly to
  * 1 reducer. So in the above case for e.g. there will be only 1 file each under
  * 'a1' and 'a2' directories.
+ * 
+ * If the output is compressed,then the sub directories and the output files will
+ * be having the extension. Say for example in the above case if bz2 is used one file 
+ * will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2
  */
 public class MultiStorage extends StoreFunc {
 
@@ -237,13 +241,29 @@ public class MultiStorage extends StoreF
         private DataOutputStream createOutputStream(String fieldValue) throws IOException
{
           Configuration conf = ctx.getConfiguration();
           TaskID taskId = ctx.getTaskAttemptID().getTaskID();
-          Path path = new Path(fieldValue, fieldValue + '-' 
-                  + NumberFormat.getInstance().format(taskId.getId()));
+          
+          // Check whether compression is enabled, if so get the extension and add them to
the path
+          boolean isCompressed = getCompressOutput(ctx);
+          CompressionCodec codec = null;
+          String extension = "";
+          if (isCompressed) {
+             Class<? extends CompressionCodec> codecClass = 
+                getOutputCompressorClass(ctx, GzipCodec.class);
+             codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, ctx.getConfiguration());
+             extension = codec.getDefaultExtension();
+          }
+          
+          Path path = new Path(fieldValue+extension, fieldValue + '-'
+                + NumberFormat.getInstance().format(taskId.getId())+extension);
           Path workOutputPath = ((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath();
           Path file = new Path(workOutputPath, path);
           FileSystem fs = file.getFileSystem(conf);                
           FSDataOutputStream fileOut = fs.create(file, false);
-          return fileOut;
+          
+          if (isCompressed)
+             return new DataOutputStream(codec.createOutputStream(fileOut));
+          else
+             return fileOut;
         }
           
       };



Mime
View raw message