Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B24F9C09 for ; Thu, 20 Sep 2012 03:08:48 +0000 (UTC) Received: (qmail 41810 invoked by uid 500); 20 Sep 2012 03:08:47 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 41626 invoked by uid 500); 20 Sep 2012 03:08:44 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 41604 invoked by uid 99); 20 Sep 2012 03:08:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Sep 2012 03:08:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Sep 2012 03:08:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 39C9023888CD for ; Thu, 20 Sep 2012 03:07:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1387846 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java test/org/apache/pig/test/TestStore.java test/org/apache/pig/test/Util.java Date: Thu, 20 Sep 2012 03:07:57 -0000 To: commits@pig.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120920030758.39C9023888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gates Date: Thu Sep 20 03:07:57 2012 New Revision: 1387846 URL: http://svn.apache.org/viewvc?rev=1387846&view=rev Log: PIG-2712 Pig does not call OutputCommitter.abortJob() on the underlying OutputFormat Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java pig/trunk/test/org/apache/pig/test/TestStore.java pig/trunk/test/org/apache/pig/test/Util.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1387846&r1=1387845&r2=1387846&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Thu Sep 20 03:07:57 2012 @@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell IMPROVEMENTS +PIG-2712: Pig does not call OutputCommitter.abortJob() on the underlying OutputFormat (rohini via gates) + PIG-2918: Avoid Spillable bag overhead where possible (dvryaboy) PIG-2900: Streaming should provide conf settings in the environment (dvryaboy) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1387846&r1=1387845&r2=1387846&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Sep 20 03:07:57 2012 @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.ResourceSchema; @@ -143,7 +144,7 @@ public class PigOutputCommitter extends } } } - + @Override public void cleanupJob(JobContext context) throws IOException { // call clean up on all map and reduce committers @@ -166,7 +167,7 @@ public class PigOutputCommitter extends } } - + // This method only be called in 20.203+/0.23 public void commitJob(JobContext context) throws IOException { // call commitJob on all map and reduce committers @@ -204,6 +205,42 @@ public class PigOutputCommitter extends } } } + + // This method only be called in 20.203+/0.23 + public void abortJob(JobContext context, State state) throws IOException { + // call abortJob on all map and reduce committers + for (Pair mapCommitter : mapOutputCommitters) { + if (mapCommitter.first!=null) { + JobContext updatedContext = setUpContext(context, + mapCommitter.second); + try { + // Use reflection, 20.2 does not have such method + Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); + m.setAccessible(true); + m.invoke(mapCommitter.first, updatedContext, state); + } catch (Exception e) { + throw new IOException(e); + } + storeCleanup(mapCommitter.second, updatedContext.getConfiguration()); + } + } + for (Pair reduceCommitter : + reduceOutputCommitters) { + if (reduceCommitter.first!=null) { + JobContext updatedContext = setUpContext(context, + reduceCommitter.second); + try { + // Use reflection, 20.2 does not have such method + Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); + m.setAccessible(true); + m.invoke(reduceCommitter.first, updatedContext, state); + } catch (Exception e) { + throw new IOException(e); + } + storeCleanup(reduceCommitter.second, updatedContext.getConfiguration()); + } + } + } @Override Modified: pig/trunk/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1387846&r1=1387845&r2=1387846&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestStore.java (original) +++ pig/trunk/test/org/apache/pig/test/TestStore.java Thu Sep 20 03:07:57 2012 @@ -23,16 +23,24 @@ import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Random; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.PigException; @@ -45,6 +53,7 @@ import org.apache.pig.backend.datastorag import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; @@ -96,14 +105,15 @@ public class TestStore extends junit.fra private static final String FAIL_UDF_NAME = "org.apache.pig.test.TestStore\\$FailUDF"; private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts"; - + private static final String TESTDIR = "/tmp/" + TestStore.class.getSimpleName(); + @Override @Before public void setUp() throws Exception { pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pc = pig.getPigContext(); - inputFileName = "/tmp/TestStore-" + new Random().nextLong() + ".txt"; - outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt"; + inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + ".txt"; + outputFileName = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null))); } @@ -111,16 +121,15 @@ public class TestStore extends junit.fra @Override @After public void tearDown() throws Exception { - Util.deleteFile(cluster, inputFileName); - Util.deleteFile(cluster, outputFileName); - new File(outputFileName).delete(); + Util.deleteDirectory(new File(TESTDIR)); + Util.deleteFile(cluster, TESTDIR); } private void storeAndCopyLocally(DataBag inpDB) throws Exception { setUpInputFileOnCluster(inpDB); String script = "a = load '" + inputFileName + "'; " + "store a into '" + outputFileName + "' using PigStorage('\t');" + - "fs -ls /tmp"; + "fs -ls " + TESTDIR; pig.setBatchOn(); Util.registerMultiLineQuery(pig, script); pig.executeBatch(); @@ -373,7 +382,15 @@ public class TestStore extends junit.fra @Test public void testSetStoreSchema() throws Exception { PigServer ps = null; - String storeSchemaOutputFile = outputFileName + "_storeSchema_test"; + Map filesToVerify = new HashMap(); + filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, Boolean.FALSE); try { ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL}; String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; @@ -383,37 +400,38 @@ public class TestStore extends junit.fra DUMMY_STORE_CLASS_NAME + "();"; for (ExecType execType : modes) { + FileLocalizer.setInitialized(false); if(execType == ExecType.MAPREDUCE) { ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - Util.deleteFile(ps.getPigContext(), inputFileName); - Util.deleteFile(ps.getPigContext(), outputFileName); - Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile); } else { Properties props = new Properties(); props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); ps = new PigServer(ExecType.LOCAL, props); - Util.deleteFile(ps.getPigContext(), inputFileName); - Util.deleteFile(ps.getPigContext(), outputFileName); - Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile); + if (Util.isHadoop1_0()) { + // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce + // OutputCommitter) is fixed only in 0.23.1 + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE); + } } ps.setBatchOn(); + Util.deleteFile(ps.getPigContext(), TESTDIR); Util.createInputFile(ps.getPigContext(), inputFileName, inputData); Util.registerMultiLineQuery(ps, script); ps.executeBatch(); - assertEquals( - "Checking if file indicating that storeSchema was " + - "called exists in " + execType + " mode", true, - Util.exists(ps.getPigContext(), storeSchemaOutputFile)); + for (Entry entry : filesToVerify.entrySet()) { + String condition = entry.getValue() ? "" : "not"; + assertEquals("Checking if file " + entry.getKey() + + " does " + condition + " exists in " + execType + + " mode", (boolean) entry.getValue(), + Util.exists(ps.getPigContext(), entry.getKey())); + } } } catch (Exception e) { e.printStackTrace(); Assert.fail("Exception encountered - hence failing:" + e); - } finally { - Util.deleteFile(ps.getPigContext(), inputFileName); - Util.deleteFile(ps.getPigContext(), outputFileName); - Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile); } } @@ -439,10 +457,7 @@ public class TestStore extends junit.fra props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); ps = new PigServer(ExecType.LOCAL, props); } - Util.deleteFile(ps.getPigContext(), inputFileName); - Util.deleteFile(ps.getPigContext(), outputFileName); - Util.deleteFile(ps.getPigContext(), cleanupFailFile); - Util.deleteFile(ps.getPigContext(), cleanupSuccessFile); + Util.deleteFile(ps.getPigContext(), TESTDIR); ps.setBatchOn(); Util.createInputFile(ps.getPigContext(), inputFileName, inputData); @@ -460,11 +475,6 @@ public class TestStore extends junit.fra } catch (Exception e) { e.printStackTrace(); Assert.fail("Exception encountered - hence failing:" + e); - } finally { - Util.deleteFile(ps.getPigContext(), inputFileName); - Util.deleteFile(ps.getPigContext(), outputFileName); - Util.deleteFile(ps.getPigContext(), cleanupFailFile); - Util.deleteFile(ps.getPigContext(), cleanupSuccessFile); } } @@ -472,15 +482,31 @@ public class TestStore extends junit.fra @Test public void testCleanupOnFailureMultiStore() throws Exception { PigServer ps = null; - String outputFileName1 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt"; - String outputFileName2 = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt"; - String cleanupSuccessFile1 = outputFileName1 + "_cleanupOnFailure_succeeded1"; - String cleanupFailFile1 = outputFileName1 + "_cleanupOnFailure_failed1"; - String cleanupSuccessFile2 = outputFileName2 + "_cleanupOnFailure_succeeded2"; - String cleanupFailFile2 = outputFileName2 + "_cleanupOnFailure_failed2"; + String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; + String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; + + Map filesToVerify = new HashMap(); + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); + filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); + filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); try { - ExecType[] modes = new ExecType[] { /*ExecType.LOCAL, */ExecType.MAPREDUCE}; + ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL}; String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; // though the second store should @@ -493,6 +519,7 @@ public class TestStore extends junit.fra DUMMY_STORE_CLASS_NAME + "('false', '2');"; for (ExecType execType : modes) { + FileLocalizer.setInitialized(false); if(execType == ExecType.MAPREDUCE) { ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); @@ -500,50 +527,38 @@ public class TestStore extends junit.fra Properties props = new Properties(); props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); ps = new PigServer(ExecType.LOCAL, props); + // LocalJobRunner does not call abortTask + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); + if (Util.isHadoop1_0()) { + // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce + // OutputCommitter) is fixed only in 0.23.1 + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); + } } - Util.deleteFile(ps.getPigContext(), inputFileName); - Util.deleteFile(ps.getPigContext(), outputFileName1); - Util.deleteFile(ps.getPigContext(), outputFileName2); - Util.deleteFile(ps.getPigContext(), cleanupFailFile1); - Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1); - Util.deleteFile(ps.getPigContext(), cleanupFailFile2); - Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2); + Util.deleteFile(ps.getPigContext(), TESTDIR); ps.setBatchOn(); Util.createInputFile(ps.getPigContext(), inputFileName, inputData); Util.registerMultiLineQuery(ps, script); ps.executeBatch(); - assertEquals( - "Checking if file indicating that cleanupOnFailure failed " + - " does not exists in " + execType + " mode", false, - Util.exists(ps.getPigContext(), cleanupFailFile1)); - assertEquals( - "Checking if file indicating that cleanupOnFailure failed " + - " does not exists in " + execType + " mode", false, - Util.exists(ps.getPigContext(), cleanupFailFile2)); - assertEquals( - "Checking if file indicating that cleanupOnFailure was " + - "successfully called exists in " + execType + " mode", true, - Util.exists(ps.getPigContext(), cleanupSuccessFile1)); - assertEquals( - "Checking if file indicating that cleanupOnFailure was " + - "successfully called exists in " + execType + " mode", true, - Util.exists(ps.getPigContext(), cleanupSuccessFile2)); + for (Entry entry : filesToVerify.entrySet()) { + String condition = entry.getValue() ? "" : "not"; + assertEquals("Checking if file " + entry.getKey() + + " does " + condition + " exists in " + execType + + " mode", (boolean) entry.getValue(), + Util.exists(ps.getPigContext(), entry.getKey())); + } } } catch (Exception e) { e.printStackTrace(); Assert.fail("Exception encountered - hence failing:" + e); - } finally { - Util.deleteFile(ps.getPigContext(), inputFileName); - Util.deleteFile(ps.getPigContext(), outputFileName1); - Util.deleteFile(ps.getPigContext(), outputFileName2); - Util.deleteFile(ps.getPigContext(), cleanupFailFile1); - Util.deleteFile(ps.getPigContext(), cleanupSuccessFile1); - Util.deleteFile(ps.getPigContext(), cleanupFailFile2); - Util.deleteFile(ps.getPigContext(), cleanupSuccessFile2); } } - + // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs" // property is set to true // The test covers multi store and single store case in local and mapreduce mode @@ -552,8 +567,7 @@ public class TestStore extends junit.fra @Test public void testSuccessFileCreation1() throws Exception { PigServer ps = null; - String[] files = new String[] { inputFileName, - outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"}; + try { ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; @@ -589,7 +603,7 @@ public class TestStore extends junit.fra ps.getPigContext().getProperties().setProperty( MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, Boolean.toString(isPropertySet)); - cleanupFiles(ps, files); + Util.deleteFile(ps.getPigContext(), TESTDIR); ps.setBatchOn(); Util.createInputFile(ps.getPigContext(), inputFileName, inputData); @@ -606,7 +620,7 @@ public class TestStore extends junit.fra } } } finally { - cleanupFiles(ps, files); + Util.deleteFile(ps.getPigContext(), TESTDIR); } } @@ -618,8 +632,6 @@ public class TestStore extends junit.fra @Test public void testSuccessFileCreation2() throws Exception { PigServer ps = null; - String[] files = new String[] { inputFileName, - outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"}; try { ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; @@ -663,7 +675,7 @@ public class TestStore extends junit.fra ps.getPigContext().getProperties().setProperty( MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, Boolean.toString(isPropertySet)); - cleanupFiles(ps, files); + Util.deleteFile(ps.getPigContext(), TESTDIR); ps.setBatchOn(); Util.createInputFile(ps.getPigContext(), inputFileName, inputData); @@ -687,7 +699,7 @@ public class TestStore extends junit.fra } } } finally { - cleanupFiles(ps, files); + Util.deleteFile(ps.getPigContext(), TESTDIR); } } @@ -700,13 +712,7 @@ public class TestStore extends junit.fra } } - private void cleanupFiles(PigServer ps, String... files) throws IOException { - for(String file:files) { - Util.deleteFile(ps.getPigContext(), file); - } - } - - + public static class DummyStore extends PigStorage implements StoreMetadata{ private boolean failInPutNext = false; @@ -734,6 +740,12 @@ public class TestStore extends junit.fra super.putNext(t); } + @SuppressWarnings("rawtypes") + @Override + public OutputFormat getOutputFormat() { + return new DummyOutputFormat(outputFileSuffix); + } + @Override public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException { @@ -814,4 +826,113 @@ public class TestStore extends junit.fra Assert.assertEquals(expected, p); } } + + static class DummyOutputFormat extends PigTextOutputFormat { + + private String outputFileSuffix; + + public DummyOutputFormat(String outputFileSuffix) { + super((byte) '\t'); + this.outputFileSuffix = outputFileSuffix; + } + + @Override + public synchronized OutputCommitter getOutputCommitter( + TaskAttemptContext context) throws IOException { + return new DummyOutputCommitter(outputFileSuffix, + super.getOutputCommitter(context)); + } + + @Override + public Path getDefaultWorkFile(TaskAttemptContext context, + String extension) throws IOException { + FileOutputCommitter committer = + (FileOutputCommitter) super.getOutputCommitter(context); + return new Path(committer.getWorkPath(), getUniqueFile(context, + "part", extension)); + } + + } + + static class DummyOutputCommitter extends OutputCommitter { + + static String FILE_SETUPJOB_CALLED = "/tmp/TestStore/_setupJob_called"; + static String FILE_SETUPTASK_CALLED = "/tmp/TestStore/_setupTask_called"; + static String FILE_COMMITTASK_CALLED = "/tmp/TestStore/_commitTask_called"; + static String FILE_ABORTTASK_CALLED = "/tmp/TestStore/_abortTask_called"; + static String FILE_CLEANUPJOB_CALLED = "/tmp/TestStore/_cleanupJob_called"; + static String FILE_COMMITJOB_CALLED = "/tmp/TestStore/_commitJob_called"; + static String FILE_ABORTJOB_CALLED = "/tmp/TestStore/_abortJob_called"; + + private String outputFileSuffix; + private OutputCommitter baseCommitter; + + public DummyOutputCommitter(String outputFileSuffix, + OutputCommitter baseCommitter) throws IOException { + this.outputFileSuffix = outputFileSuffix; + this.baseCommitter = baseCommitter; + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + baseCommitter.setupJob(jobContext); + createFile(jobContext, FILE_SETUPJOB_CALLED + outputFileSuffix); + } + + @Override + public void setupTask(TaskAttemptContext taskContext) + throws IOException { + baseCommitter.setupTask(taskContext); + createFile(taskContext, FILE_SETUPTASK_CALLED + outputFileSuffix); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + return true; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) + throws IOException { + baseCommitter.commitTask(taskContext); + createFile(taskContext, FILE_COMMITTASK_CALLED + outputFileSuffix); + } + + @Override + public void abortTask(TaskAttemptContext taskContext) + throws IOException { + baseCommitter.abortTask(taskContext); + createFile(taskContext, FILE_ABORTTASK_CALLED + outputFileSuffix); + } + + @Override + public void cleanupJob(JobContext jobContext) throws IOException { + baseCommitter.cleanupJob(jobContext); + createFile(jobContext, FILE_CLEANUPJOB_CALLED + outputFileSuffix); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + baseCommitter.commitJob(jobContext); + createFile(jobContext, FILE_COMMITJOB_CALLED + outputFileSuffix); + } + + @Override + public void abortJob(JobContext jobContext, State state) + throws IOException { + baseCommitter.abortJob(jobContext, state); + createFile(jobContext, FILE_ABORTJOB_CALLED + outputFileSuffix); + } + + public void createFile(JobContext jobContext, String fileName) + throws IOException { + Configuration conf = jobContext.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(new Path(fileName).getParent()); + FSDataOutputStream out = fs.create(new Path(fileName), true); + out.close(); + } + + } } Modified: pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1387846&r1=1387845&r2=1387846&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/Util.java (original) +++ pig/trunk/test/org/apache/pig/test/Util.java Thu Sep 20 03:07:57 2012 @@ -575,6 +575,10 @@ public class Util { } static public void copyFromClusterToLocal(MiniCluster cluster, String fileNameOnCluster, String localFileName) throws IOException { + File parent = new File(localFileName).getParentFile(); + if (!parent.exists()) { + parent.mkdirs(); + } PrintWriter writer = new PrintWriter(new FileWriter(localFileName)); FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(