pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aniket...@apache.org
Subject svn commit: r1564450 - in /pig/branches/branch-0.12: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java src/org/apache/pig/impl/io/SequenceFileInterStorage.java src/org/apache/pig/impl/util/Utils.java
Date Tue, 04 Feb 2014 19:58:09 GMT
Author: aniket486
Date: Tue Feb  4 19:58:08 2014
New Revision: 1564450

URL: http://svn.apache.org/r1564450
Log:
PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage
(aniket486)

Modified:
    pig/branches/branch-0.12/CHANGES.txt
    pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
    pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java

Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Tue Feb  4 19:58:08 2014
@@ -32,6 +32,8 @@ PIG-3480: TFile-based tmpfile compressio
 
 BUG FIXES
 
+PIG-3741: Utils.setTmpFileCompressionOnConf can cause side effect for SequenceFileInterStorage
(aniket486)
+
 PIG-3641: Split "otherwise" producing incorrect output when combined with ColumnPruning (knoguchi)
 
 PIG-3677: ConfigurationUtil.getLocalFSProperties can return an inconsistent property set
(rohini)

Modified: pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++ pig/branches/branch-0.12/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Tue Feb  4 19:58:08 2014
@@ -576,6 +576,10 @@ public class JobControlCompiler{
             LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan,
POStore.class);
             LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan,
POStore.class);
 
+            // tmp file compression setups
+            // PIG-3741 This must be done before setStoreLocation on POStores
+            Utils.setTmpFileCompressionOnConf(pigContext, conf);
+
             for (POStore st: mapStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
@@ -816,9 +820,6 @@ public class JobControlCompiler{
                 conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
             }
 
-            // tmp file compression setups
-            Utils.setTmpFileCompressionOnConf(pigContext, conf);
-
             String tmp;
             long maxCombinedSplitSize = 0;
             if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination",
"true").equals("false"))

Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/io/SequenceFileInterStorage.java Tue
Feb  4 19:58:08 2014
@@ -205,6 +205,8 @@ StoreFuncInterface, LoadMetadata {
 
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        Utils.setMapredCompressionCodecProps(conf);
         FileOutputFormat.setOutputPath(job, new Path(location));
     }
 

Modified: pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java?rev=1564450&r1=1564449&r2=1564450&view=diff
==============================================================================
--- pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/branch-0.12/src/org/apache/pig/impl/util/Utils.java Tue Feb  4 19:58:08 2014
@@ -354,7 +354,20 @@ public class Utils {
         }
     }
 
+    public static void setMapredCompressionCodecProps(Configuration conf) {
+        String codec = conf.get(
+                PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "");
+        if ("".equals(codec) && conf.get("mapred.output.compression.codec") != null)
{
+            conf.setBoolean("mapred.output.compress", true);
+        } else if(TEMPFILE_STORAGE.SEQFILE.ensureCodecSupported(codec)) {
+            conf.setBoolean("mapred.output.compress", true);
+            conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+        }
+        // no codec specified
+    }
+
     public static void setTmpFileCompressionOnConf(PigContext pigContext, Configuration conf)
throws IOException{
+        // PIG-3741 This is also called for non-intermediate jobs, do not set any mapred
properties here
         if (pigContext == null) {
             return;
         }
@@ -365,7 +378,6 @@ public class Utils {
         case INTER:
             break;
         case SEQFILE:
-            conf.setBoolean("mapred.output.compress", true);
             conf.set(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "seqfile");
             if("".equals(codec)) {
                 // codec is not specified, ensure  is set
@@ -374,7 +386,7 @@ public class Utils {
                     throw new IOException("mapred.output.compression.codec is not set");
                 }
             } else if(storage.ensureCodecSupported(codec)) {
-                conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+                // do nothing
             } else {
                 throw new IOException("Invalid temporary file compression codec [" + codec
+ "]. " +
                         "Expected compression codecs for " + storage.getStorageClass().getName()
+ " are " + storage.supportedCodecsToString() + ".");



Mime
View raw message