pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From knogu...@apache.org
Subject svn commit: r1711365 - in /pig/trunk: ./ conf/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/impl/io/compress/ test/e2e/pig/tests/ test/org/apache/pig/test/
Date Thu, 29 Oct 2015 21:28:39 GMT
Author: knoguchi
Date: Thu Oct 29 21:28:39 2015
New Revision: 1711365

URL: http://svn.apache.org/viewvc?rev=1711365&view=rev
Log:
PIG-3251 Bzip2TextInputFormat requires double the memory of maximum record size (knoguchi)

Added:
    pig/trunk/src/org/apache/pig/impl/io/compress/
    pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/src/org/apache/pig/builtin/TextLoader.java
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/org/apache/pig/test/TestBZip.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 29 21:28:39 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-3251: Bzip2TextInputFormat requires double the memory of maximum record size (knoguchi)
+
 PIG-4704: Customizable Error Handling for Storers in Pig (siddhimehta via daijy)
 
 PIG-4717: Update Apache HTTPD LogParser to latest version (nielsbasjes via daijy)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Oct 29 21:28:39 2015
@@ -377,6 +377,14 @@
 #
 # pig.auto.local.input.maxbytes=100000000
 
+
+#
+# Should use hadoop's BZipCodec for bzip2 input? (for PigStorage and TextLoader)
+# Only available for hadoop 2.X and after and ignored for others.(Default: true)
+#
+# pig.bzip.use.hadoop.inputformat=true
+
+
 ############################################################################
 #
 # Security Features
@@ -638,4 +646,5 @@ hcat.bin=/usr/local/hcat/bin/hcat
 # loading once for sampling and loading again for partitioning.
 # Used to avoid hitting external non-filesystem datasources like HBase and Accumulo twice.
      
-pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
+

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Oct 29 21:28:39 2015
@@ -357,6 +357,12 @@ public class PigConfiguration {
      */
     public static final String PIG_DATETIME_DEFAULT_TIMEZONE = "pig.datetime.default.tz";
 
+    /**
+     * Using hadoop's TextInputFormat for reading bzip input instead of using Pig's Bzip2TextInputFormat.
True by default
+     * (only valid for 0.23/2.X)
+     */
+    public static final String PIG_BZIP_USE_HADOOP_INPUTFORMAT = "pig.bzip.use.hadoop.inputformat";
+
 
     // Pig on Tez runtime settings
     /**

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Thu Oct 29 21:28:39 2015
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -61,6 +62,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
+import org.apache.pig.impl.io.compress.BZip2CodecWithExtensionBZ;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.streaming.StreamingCommand;
@@ -228,6 +230,7 @@ public class PigServer {
 
         this.filter = new BlackAndWhitelistFilter(this);
 
+        addHadoopProperties();
         addJarsFromProperties();
         markPredeployedJarsFromProperties();
 
@@ -240,6 +243,25 @@ public class PigServer {
 
     }
 
+    private void addHadoopProperties() throws ExecException {
+        // For BZip input on hadoop 0.23/2.X
+        // with PIG_BZIP_USE_HADOOP_INPUTFORMAT turned on,
+        // PigTextInputFormat depends on hadoop's TextInputFormat
+        // for handling bzip2 input. One problem is it only recognize 'bz2'
+        // as extension and not 'bz'.
+        // Adding custom BZip2 codec that returns 'bz' as extension
+        // for backward compatibility.
+        String codecs =
+            pigContext.getProperties().getProperty("io.compression.codecs");
+
+        if( codecs != null
+            && codecs.contains(BZip2Codec.class.getCanonicalName() ) ) {
+            pigContext.getProperties().setProperty("io.compression.codecs",
+                codecs + ","
+                + BZip2CodecWithExtensionBZ.class.getCanonicalName() );
+        }
+    }
+
     private void addJarsFromProperties() throws ExecException {
         //add jars from properties to extraJars
         String jar_str = pigContext.getProperties().getProperty("pig.additional.jars");

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Oct 29 21:28:39 2015
@@ -54,6 +54,7 @@ import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.OverwritableStoreFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -67,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -159,6 +161,10 @@ LoadPushDown, LoadMetadata, StoreMetadat
     private static final String TAG_SOURCE_PATH = "tagPath";
     private Path sourcePath = null;
 
+    // it determines whether to depend on pig's own Bzip2TextInputFormat or
+    // to simply depend on hadoop for handling bzip2 inputs
+    private boolean bzipinput_usehadoops ;
+
     private Options populateValidOptions() {
         Options validOptions = new Options();
         validOptions.addOption("schema", false, "Loads / Stores the schema of the relation
using a hidden JSON file.");
@@ -419,9 +425,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
 
     @Override
     public InputFormat getInputFormat() {
-        if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+        if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
+           && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+            mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {
+            mLog.info("Using PigTextInputFormat");
             return new PigTextInputFormat();
         }
     }
@@ -439,6 +448,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
     throws IOException {
         loadLocation = location;
         FileInputFormat.setInputPaths(job, location);
+        bzipinput_usehadoops = job.getConfiguration().getBoolean(
+                                  PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT,
+                                  true );
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TextLoader.java Thu Oct 29 21:28:39 2015
@@ -22,6 +22,8 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -29,11 +31,13 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -51,6 +55,12 @@ public class TextLoader extends LoadFunc
     protected RecordReader in = null;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private String loadLocation;
+    protected final Log mLog = LogFactory.getLog(getClass());
+
+    // it determines whether to depend on pig's own Bzip2TextInputFormat or
+    // to simply depend on hadoop for handling bzip2 inputs
+    private boolean bzipinput_usehadoops ;
+
 
     @Override
     public Tuple getNext() throws IOException {
@@ -248,9 +258,13 @@ public class TextLoader extends LoadFunc
 
     @Override
     public InputFormat getInputFormat() {
-        if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+        if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
+           && !HadoopShims.isHadoopYARN()
+           && !bzipinput_usehadoops ) {
+            mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {
+            mLog.info("Using PigTextInputFormat");
             return new PigTextInputFormat();
         }
     }
@@ -269,5 +283,8 @@ public class TextLoader extends LoadFunc
     public void setLocation(String location, Job job) throws IOException {
         loadLocation = location;
         FileInputFormat.setInputPaths(job, location);
+        bzipinput_usehadoops = job.getConfiguration().getBoolean(
+                                  PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT,
+                                  true );
     }
 }

Added: pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java?rev=1711365&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java (added)
+++ pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java Thu Oct 29
21:28:39 2015
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io.compress;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+
+public class BZip2CodecWithExtensionBZ extends BZip2Codec {
+  /**
+  * For historical reasons, Pig supports .bz and .bz2 for bzip2 extension
+  *
+  * @return A String telling the additional bzip2 file extension 'bz'
+  */
+  public String getDefaultExtension() {
+      return ".bz";
+  }
+}

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Thu Oct 29 21:28:39 2015
@@ -3508,7 +3508,9 @@ store b into ':OUTPATH:';\,
             'tests' => [
                     {
                     # test reading and writing out files with .bz2 extension
+                    # relying on Hadoop's bzipcodec (for 0.23/2.X and after)
                     'num' => 1,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'],
                     'pig' => q\
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
 store a into ':OUTPATH:.intermediate.bz2';
@@ -3518,7 +3520,33 @@ store b into ':OUTPATH:';\,
                     },
                     {
                     # test reading and writing with .bz extension
+                    # relying on Hadoop's bzipcodec (for 0.23/2.X and after)
                     'num' => 2,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'],
+                    'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+store a into ':OUTPATH:.intermediate.bz';
+b = load ':OUTPATH:.intermediate.bz';
+store b into ':OUTPATH:';\,
+                    'notmq' => 1,
+                    },
+                    {
+                    # test reading and writing out files with .bz2 extension
+                    # using Bzip2TextInputFormat.
+                    'num' => 3,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'],
+                    'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+store a into ':OUTPATH:.intermediate.bz2';
+b = load ':OUTPATH:.intermediate.bz2';
+store b into ':OUTPATH:';\,
+                    'notmq' => 1,
+                    },
+                    {
+                    # test reading and writing with .bz extension
+                    # using Bzip2TextInputFormat.
+                    'num' => 4,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'],
                     'pig' => q\
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
 store a into ':OUTPATH:.intermediate.bz';

Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Thu Oct 29 21:28:39 2015
@@ -28,6 +28,7 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Properties;
@@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -54,11 +56,34 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 public class TestBZip {
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
+    @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
+    public static Iterable<Object[]> data() {
+        if ( HadoopShims.isHadoopYARN() ) {
+            return Arrays.asList(new Object[][] {
+                { false  },
+                { true   }
+            });
+        } else {
+            return Arrays.asList(new Object[][] {
+                { false }
+            });
+        }
+    }
+
+    public TestBZip (Boolean useBzipFromHadoop) {
+        properties = cluster.getProperties();
+        properties.setProperty("pig.bzip.use.hadoop.inputformat", useBzipFromHadoop.toString());
+    }
+
     @Rule
     public TemporaryFolder folder = new TemporaryFolder();
 
@@ -458,6 +483,7 @@ public class TestBZip {
             props.put(entry.getKey(), entry.getValue());
         }
         props.setProperty(MRConfiguration.MAX_SPLIT_SIZE, Integer.toString(splitSize));
+        props.setProperty("pig.noSplitCombination", "true");
         PigServer pig = new PigServer(cluster.getExecType(), props);
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props));
         fs.delete(new Path(outputFile), true);
@@ -471,7 +497,7 @@ public class TestBZip {
                 numPartFiles++;
             }
         }
-        assertEquals(true, numPartFiles > 0);
+        assertEquals(true, numPartFiles > 1);
 
         // verify record count to verify we read bzip data correctly
         Util.registerMultiLineQuery(pig, script);
@@ -487,26 +513,32 @@ public class TestBZip {
                 "1\t2\r3\t4"
         };
 
-        String inputFileName = "input.txt";
-        Util.createInputFile(cluster, inputFileName, inputData);
-
-        PigServer pig = new PigServer(cluster.getExecType(), properties);
-
-        pig.setBatchOn();
-        pig.registerQuery("a = load '" +  inputFileName + "';");
-        pig.registerQuery("store a into 'output.bz2';");
-        pig.registerQuery("store a into 'output';");
-        pig.executeBatch();
+        try {
+            String inputFileName = "input.txt";
+            Util.createInputFile(cluster, inputFileName, inputData);
 
-        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
-                pig.getPigContext().getProperties()));
-        FileStatus[] outputFiles = fs.listStatus(new Path("output"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        outputFiles = fs.listStatus(new Path("output.bz2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+            pig.setBatchOn();
+            pig.registerQuery("a = load '" +  inputFileName + "';");
+            pig.registerQuery("store a into 'output.bz2';");
+            pig.registerQuery("store a into 'output';");
+            pig.executeBatch();
+
+            FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                    pig.getPigContext().getProperties()));
+            FileStatus[] outputFiles = fs.listStatus(new Path("output"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+
+            outputFiles = fs.listStatus(new Path("output.bz2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+        } finally {
+            Util.deleteFile(cluster, "input.txt");
+            Util.deleteFile(cluster, "output.bz2");
+            Util.deleteFile(cluster, "output");
+        }
     }
 
     @Test
@@ -518,26 +550,32 @@ public class TestBZip {
         String inputFileName = "input2.txt";
         Util.createInputFile(cluster, inputFileName, inputData);
 
-        PigServer pig = new PigServer(cluster.getExecType(), properties);
-        PigContext pigContext = pig.getPigContext();
-        pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
-        pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec"
);
-
-        pig.setBatchOn();
-        pig.registerQuery("a = load '" +  inputFileName + "';");
-        pig.registerQuery("store a into 'output2.bz2';");
-        pig.registerQuery("store a into 'output2';");
-        pig.executeBatch();
-
-        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
-                pig.getPigContext().getProperties()));
-        FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
-
-        outputFiles = fs.listStatus(new Path("output2.bz2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+        try {
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
+            PigContext pigContext = pig.getPigContext();
+            pigContext.getProperties().setProperty( "output.compression.enabled", "true"
);
+            pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec"
);
+
+            pig.setBatchOn();
+            pig.registerQuery("a = load '" +  inputFileName + "';");
+            pig.registerQuery("store a into 'output2.bz2';");
+            pig.registerQuery("store a into 'output2';");
+            pig.executeBatch();
+
+            FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                    pig.getPigContext().getProperties()));
+            FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+
+            outputFiles = fs.listStatus(new Path("output2.bz2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+        } finally {
+            Util.deleteFile(cluster,"input2.txt");
+            Util.deleteFile(cluster,"output2.bz2");
+            Util.deleteFile(cluster,"output2");
+        }
     }
 
     /**
@@ -661,20 +699,26 @@ public class TestBZip {
         pw.println(inputScript);
         pw.close();
 
-        PigServer pig = new PigServer(cluster.getExecType(), properties);
+        try {
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        FileInputStream fis = new FileInputStream(inputScriptName);
-        pig.registerScript(fis);
+            FileInputStream fis = new FileInputStream(inputScriptName);
+            pig.registerScript(fis);
 
-        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
-                pig.getPigContext().getProperties()));
-        FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
-
-        outputFiles = fs.listStatus(new Path("output3.bz2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+            FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                    pig.getPigContext().getProperties()));
+            FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+
+            outputFiles = fs.listStatus(new Path("output3.bz2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+        } finally {
+            Util.deleteFile(cluster, "input3.txt");
+            Util.deleteFile(cluster, "output3.bz2");
+            Util.deleteFile(cluster, "output3");
+        }
     }
 
 }



Mime
View raw message