Author: knoguchi
Date: Thu Oct 29 21:28:39 2015
New Revision: 1711365
URL: http://svn.apache.org/viewvc?rev=1711365&view=rev
Log:
PIG-3251 Bzip2TextInputFormat requires double the memory of maximum record size (knoguchi)
Added:
pig/trunk/src/org/apache/pig/impl/io/compress/
pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/src/org/apache/pig/builtin/TextLoader.java
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/org/apache/pig/test/TestBZip.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 29 21:28:39 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-3251: Bzip2TextInputFormat requires double the memory of maximum record size (knoguchi)
+
PIG-4704: Customizable Error Handling for Storers in Pig (siddhimehta via daijy)
PIG-4717: Update Apache HTTPD LogParser to latest version (nielsbasjes via daijy)
Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Oct 29 21:28:39 2015
@@ -377,6 +377,14 @@
#
# pig.auto.local.input.maxbytes=100000000
+
+#
+# Should use hadoop's BZipCodec for bzip2 input? (for PigStorage and TextLoader)
+# Only available for hadoop 2.X and after and ignored for others.(Default: true)
+#
+# pig.bzip.use.hadoop.inputformat=true
+
+
############################################################################
#
# Security Features
@@ -638,4 +646,5 @@ hcat.bin=/usr/local/hcat/bin/hcat
# loading once for sampling and loading again for partitioning.
# Used to avoid hitting external non-filesystem datasources like HBase and Accumulo twice.
-pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
+
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Thu Oct 29 21:28:39 2015
@@ -357,6 +357,12 @@ public class PigConfiguration {
*/
public static final String PIG_DATETIME_DEFAULT_TIMEZONE = "pig.datetime.default.tz";
+ /**
+ * Using hadoop's TextInputFormat for reading bzip input instead of using Pig's Bzip2TextInputFormat.
True by default
+ * (only valid for 0.23/2.X)
+ */
+ public static final String PIG_BZIP_USE_HADOOP_INPUTFORMAT = "pig.bzip.use.hadoop.inputformat";
+
// Pig on Tez runtime settings
/**
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Thu Oct 29 21:28:39 2015
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -61,6 +62,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
+import org.apache.pig.impl.io.compress.BZip2CodecWithExtensionBZ;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.streaming.StreamingCommand;
@@ -228,6 +230,7 @@ public class PigServer {
this.filter = new BlackAndWhitelistFilter(this);
+ addHadoopProperties();
addJarsFromProperties();
markPredeployedJarsFromProperties();
@@ -240,6 +243,25 @@ public class PigServer {
}
+ private void addHadoopProperties() throws ExecException {
+ // For BZip input on hadoop 0.23/2.X
+ // with PIG_BZIP_USE_HADOOP_INPUTFORMAT turned on,
+ // PigTextInputFormat depends on hadoop's TextInputFormat
+ // for handling bzip2 input. One problem is it only recognize 'bz2'
+ // as extension and not 'bz'.
+ // Adding custom BZip2 codec that returns 'bz' as extension
+ // for backward compatibility.
+ String codecs =
+ pigContext.getProperties().getProperty("io.compression.codecs");
+
+ if( codecs != null
+ && codecs.contains(BZip2Codec.class.getCanonicalName() ) ) {
+ pigContext.getProperties().setProperty("io.compression.codecs",
+ codecs + ","
+ + BZip2CodecWithExtensionBZ.class.getCanonicalName() );
+ }
+ }
+
private void addJarsFromProperties() throws ExecException {
//add jars from properties to extraJars
String jar_str = pigContext.getProperties().getProperty("pig.additional.jars");
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Oct 29 21:28:39 2015
@@ -54,6 +54,7 @@ import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
import org.apache.pig.OverwritableStoreFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -67,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -159,6 +161,10 @@ LoadPushDown, LoadMetadata, StoreMetadat
private static final String TAG_SOURCE_PATH = "tagPath";
private Path sourcePath = null;
+ // it determines whether to depend on pig's own Bzip2TextInputFormat or
+ // to simply depend on hadoop for handling bzip2 inputs
+ private boolean bzipinput_usehadoops ;
+
private Options populateValidOptions() {
Options validOptions = new Options();
validOptions.addOption("schema", false, "Loads / Stores the schema of the relation
using a hidden JSON file.");
@@ -419,9 +425,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
@Override
public InputFormat getInputFormat() {
- if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+ if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
+ && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+ mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
+ mLog.info("Using PigTextInputFormat");
return new PigTextInputFormat();
}
}
@@ -439,6 +448,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
throws IOException {
loadLocation = location;
FileInputFormat.setInputPaths(job, location);
+ bzipinput_usehadoops = job.getConfiguration().getBoolean(
+ PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT,
+ true );
}
@Override
Modified: pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TextLoader.java Thu Oct 29 21:28:39 2015
@@ -22,6 +22,8 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -29,11 +31,13 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -51,6 +55,12 @@ public class TextLoader extends LoadFunc
protected RecordReader in = null;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
private String loadLocation;
+ protected final Log mLog = LogFactory.getLog(getClass());
+
+ // it determines whether to depend on pig's own Bzip2TextInputFormat or
+ // to simply depend on hadoop for handling bzip2 inputs
+ private boolean bzipinput_usehadoops ;
+
@Override
public Tuple getNext() throws IOException {
@@ -248,9 +258,13 @@ public class TextLoader extends LoadFunc
@Override
public InputFormat getInputFormat() {
- if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+ if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
+ && !HadoopShims.isHadoopYARN()
+ && !bzipinput_usehadoops ) {
+ mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
+ mLog.info("Using PigTextInputFormat");
return new PigTextInputFormat();
}
}
@@ -269,5 +283,8 @@ public class TextLoader extends LoadFunc
public void setLocation(String location, Job job) throws IOException {
loadLocation = location;
FileInputFormat.setInputPaths(job, location);
+ bzipinput_usehadoops = job.getConfiguration().getBoolean(
+ PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT,
+ true );
}
}
Added: pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java?rev=1711365&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java (added)
+++ pig/trunk/src/org/apache/pig/impl/io/compress/BZip2CodecWithExtensionBZ.java Thu Oct 29
21:28:39 2015
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io.compress;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+
+public class BZip2CodecWithExtensionBZ extends BZip2Codec {
+ /**
+ * For historical reasons, Pig supports .bz and .bz2 for bzip2 extension
+ *
+ * @return A String telling the additional bzip2 file extension 'bz'
+ */
+ public String getDefaultExtension() {
+ return ".bz";
+ }
+}
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Thu Oct 29 21:28:39 2015
@@ -3508,7 +3508,9 @@ store b into ':OUTPATH:';\,
'tests' => [
{
# test reading and writing out files with .bz2 extension
+ # relying on Hadoop's bzipcodec (for 0.23/2.X and after)
'num' => 1,
+ 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'],
'pig' => q\
a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
store a into ':OUTPATH:.intermediate.bz2';
@@ -3518,7 +3520,33 @@ store b into ':OUTPATH:';\,
},
{
# test reading and writing with .bz extension
+ # relying on Hadoop's bzipcodec (for 0.23/2.X and after)
'num' => 2,
+ 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'],
+ 'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+store a into ':OUTPATH:.intermediate.bz';
+b = load ':OUTPATH:.intermediate.bz';
+store b into ':OUTPATH:';\,
+ 'notmq' => 1,
+ },
+ {
+ # test reading and writing out files with .bz2 extension
+ # using Bzip2TextInputFormat.
+ 'num' => 3,
+ 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'],
+ 'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+store a into ':OUTPATH:.intermediate.bz2';
+b = load ':OUTPATH:.intermediate.bz2';
+store b into ':OUTPATH:';\,
+ 'notmq' => 1,
+ },
+ {
+ # test reading and writing with .bz extension
+ # using Bzip2TextInputFormat.
+ 'num' => 4,
+ 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'],
'pig' => q\
a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
store a into ':OUTPATH:.intermediate.bz';
Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1711365&r1=1711364&r2=1711365&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Thu Oct 29 21:28:39 2015
@@ -28,6 +28,7 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
@@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -54,11 +56,34 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+@RunWith(Parameterized.class)
public class TestBZip {
private static Properties properties;
private static MiniGenericCluster cluster;
+ @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
+ public static Iterable<Object[]> data() {
+ if ( HadoopShims.isHadoopYARN() ) {
+ return Arrays.asList(new Object[][] {
+ { false },
+ { true }
+ });
+ } else {
+ return Arrays.asList(new Object[][] {
+ { false }
+ });
+ }
+ }
+
+ public TestBZip (Boolean useBzipFromHadoop) {
+ properties = cluster.getProperties();
+ properties.setProperty("pig.bzip.use.hadoop.inputformat", useBzipFromHadoop.toString());
+ }
+
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@@ -458,6 +483,7 @@ public class TestBZip {
props.put(entry.getKey(), entry.getValue());
}
props.setProperty(MRConfiguration.MAX_SPLIT_SIZE, Integer.toString(splitSize));
+ props.setProperty("pig.noSplitCombination", "true");
PigServer pig = new PigServer(cluster.getExecType(), props);
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props));
fs.delete(new Path(outputFile), true);
@@ -471,7 +497,7 @@ public class TestBZip {
numPartFiles++;
}
}
- assertEquals(true, numPartFiles > 0);
+ assertEquals(true, numPartFiles > 1);
// verify record count to verify we read bzip data correctly
Util.registerMultiLineQuery(pig, script);
@@ -487,26 +513,32 @@ public class TestBZip {
"1\t2\r3\t4"
};
- String inputFileName = "input.txt";
- Util.createInputFile(cluster, inputFileName, inputData);
-
- PigServer pig = new PigServer(cluster.getExecType(), properties);
-
- pig.setBatchOn();
- pig.registerQuery("a = load '" + inputFileName + "';");
- pig.registerQuery("store a into 'output.bz2';");
- pig.registerQuery("store a into 'output';");
- pig.executeBatch();
+ try {
+ String inputFileName = "input.txt";
+ Util.createInputFile(cluster, inputFileName, inputData);
- FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
- pig.getPigContext().getProperties()));
- FileStatus[] outputFiles = fs.listStatus(new Path("output"),
- Util.getSuccessMarkerPathFilter());
- assertTrue(outputFiles[0].getLen() > 0);
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
- outputFiles = fs.listStatus(new Path("output.bz2"),
- Util.getSuccessMarkerPathFilter());
- assertTrue(outputFiles[0].getLen() > 0);
+ pig.setBatchOn();
+ pig.registerQuery("a = load '" + inputFileName + "';");
+ pig.registerQuery("store a into 'output.bz2';");
+ pig.registerQuery("store a into 'output';");
+ pig.executeBatch();
+
+ FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+ pig.getPigContext().getProperties()));
+ FileStatus[] outputFiles = fs.listStatus(new Path("output"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+
+ outputFiles = fs.listStatus(new Path("output.bz2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+ } finally {
+ Util.deleteFile(cluster, "input.txt");
+ Util.deleteFile(cluster, "output.bz2");
+ Util.deleteFile(cluster, "output");
+ }
}
@Test
@@ -518,26 +550,32 @@ public class TestBZip {
String inputFileName = "input2.txt";
Util.createInputFile(cluster, inputFileName, inputData);
- PigServer pig = new PigServer(cluster.getExecType(), properties);
- PigContext pigContext = pig.getPigContext();
- pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
- pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec"
);
-
- pig.setBatchOn();
- pig.registerQuery("a = load '" + inputFileName + "';");
- pig.registerQuery("store a into 'output2.bz2';");
- pig.registerQuery("store a into 'output2';");
- pig.executeBatch();
-
- FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
- pig.getPigContext().getProperties()));
- FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
- Util.getSuccessMarkerPathFilter());
- assertTrue(outputFiles[0].getLen() > 0);
-
- outputFiles = fs.listStatus(new Path("output2.bz2"),
- Util.getSuccessMarkerPathFilter());
- assertTrue(outputFiles[0].getLen() > 0);
+ try {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
+ PigContext pigContext = pig.getPigContext();
+ pigContext.getProperties().setProperty( "output.compression.enabled", "true"
);
+ pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec"
);
+
+ pig.setBatchOn();
+ pig.registerQuery("a = load '" + inputFileName + "';");
+ pig.registerQuery("store a into 'output2.bz2';");
+ pig.registerQuery("store a into 'output2';");
+ pig.executeBatch();
+
+ FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+ pig.getPigContext().getProperties()));
+ FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+
+ outputFiles = fs.listStatus(new Path("output2.bz2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+ } finally {
+ Util.deleteFile(cluster,"input2.txt");
+ Util.deleteFile(cluster,"output2.bz2");
+ Util.deleteFile(cluster,"output2");
+ }
}
/**
@@ -661,20 +699,26 @@ public class TestBZip {
pw.println(inputScript);
pw.close();
- PigServer pig = new PigServer(cluster.getExecType(), properties);
+ try {
+ PigServer pig = new PigServer(cluster.getExecType(), properties);
- FileInputStream fis = new FileInputStream(inputScriptName);
- pig.registerScript(fis);
+ FileInputStream fis = new FileInputStream(inputScriptName);
+ pig.registerScript(fis);
- FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
- pig.getPigContext().getProperties()));
- FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
- Util.getSuccessMarkerPathFilter());
- assertTrue(outputFiles[0].getLen() > 0);
-
- outputFiles = fs.listStatus(new Path("output3.bz2"),
- Util.getSuccessMarkerPathFilter());
- assertTrue(outputFiles[0].getLen() > 0);
+ FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+ pig.getPigContext().getProperties()));
+ FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+
+ outputFiles = fs.listStatus(new Path("output3.bz2"),
+ Util.getSuccessMarkerPathFilter());
+ assertTrue(outputFiles[0].getLen() > 0);
+ } finally {
+ Util.deleteFile(cluster, "input3.txt");
+ Util.deleteFile(cluster, "output3.bz2");
+ Util.deleteFile(cluster, "output3");
+ }
}
}
|