Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 79590 invoked from network); 4 Jun 2010 00:45:51 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 4 Jun 2010 00:45:51 -0000 Received: (qmail 79842 invoked by uid 500); 4 Jun 2010 00:45:51 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 79807 invoked by uid 500); 4 Jun 2010 00:45:50 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 79800 invoked by uid 500); 4 Jun 2010 00:45:50 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 79797 invoked by uid 99); 4 Jun 2010 00:45:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jun 2010 00:45:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jun 2010 00:45:46 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2A3F923889D2; Fri, 4 Jun 2010 00:45:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r951229 - in /hadoop/pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java test/org/apache/pig/test/TestStore.java Date: Fri, 04 Jun 2010 00:45:25 -0000 To: pig-commits@incubator.apache.org From: pradeepkth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100604004525.2A3F923889D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: pradeepkth Date: Fri Jun 4 00:45:24 2010 New Revision: 951229 URL: http://svn.apache.org/viewvc?rev=951229&view=rev Log: PIG-1433: pig should create success file if mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth) Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=951229&r1=951228&r2=951229&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Jun 4 00:45:24 2010 @@ -70,6 +70,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1433: pig should create success file if +mapreduce.fileoutputcommitter.marksuccessfuljobs is true (pradeepkth) + PIG-1347: Clear up output directory for a failed job (daijy) PIG-1419: Remove "user.name" from JobConf (daijy) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=951229&r1=951228&r2=951229&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Jun 4 00:45:24 2010 @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobClient; @@ -79,6 +80,11 @@ public class MapReduceLauncher extends L private boolean aggregateWarning = false; private Map failureMap; + public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; + + public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = + "mapreduce.fileoutputcommitter.marksuccessfuljobs"; + /** * Get the exception that caused a failure on the backend for a * store location (if any). @@ -309,6 +315,9 @@ public class MapReduceLauncher extends L } if (!st.isTmpStore()) { succeededStores.add(st); + // create an "_SUCCESS" file in output location if + // output location is a filesystem dir + createSuccessFile(job, st); finalStores++; if (st.isMultiStore()) { String counterName = PigStatsUtil.getMultiStoreCounterName(st); @@ -509,6 +518,24 @@ public class MapReduceLauncher extends L PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration()); } + private boolean shouldMarkOutputDir(Job job) { + return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + false); + } + + private void createSuccessFile(Job job, POStore store) throws IOException { + if(shouldMarkOutputDir(job)) { + FileSystem fs = FileSystem.get(job.getJobConf()); + Path outputPath = new Path(store.getSFile().getFileName()); + if(fs.exists(outputPath)){ + // create a file in the folder to mark it + Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); + if(!fs.exists(filePath)) { + fs.create(filePath).close(); + } + } + } + } /** * An exception handler class to handle exceptions thrown by the job controller thread Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=951229&r1=951228&r2=951229&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Fri Jun 4 00:45:24 2010 @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.EvalFunc; import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; @@ -55,6 +56,7 @@ import org.apache.pig.data.DefaultBagFac import org.apache.pig.data.DefaultTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.LOStore; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; @@ -89,7 +91,14 @@ public class TestStore extends junit.fra String inputFileName; String outputFileName; - + + private static final String DUMMY_STORE_CLASS_NAME + = "org.apache.pig.test.TestStore\\$DummyStore"; + + private static final String FAIL_UDF_NAME + = "org.apache.pig.test.TestStore\\$FailUDF"; + private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts"; + @Override @Before public void setUp() throws Exception { @@ -558,8 +567,168 @@ public class TestStore extends junit.fra } } - private static final String DUMMY_STORE_CLASS_NAME - = "org.apache.pig.test.TestStore\\$DummyStore"; + // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs" + // property is set to true + // The test covers multi store and single store case in local and mapreduce mode + // The test also checks that "_SUCCESS" file is NOT created when the property + // is not set to true in all the modes. + @Test + public void testSuccessFileCreation1() throws Exception { + PigServer ps = null; + String[] files = new String[] { inputFileName, + outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"}; + try { + ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; + String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; + + String multiStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hello';" + + "c = filter a by $0 == 'hi';" + + "d = filter a by $0 == 'bye';" + + "store b into '" + outputFileName + "_1';" + + "store c into '" + outputFileName + "_2';" + + "store d into '" + outputFileName + "_3';"; + + String singleStoreScript = "a = load '"+ inputFileName + "';" + + "store a into '" + outputFileName + "_1';" ; + + for (ExecType execType : modes) { + for(boolean isPropertySet: new boolean[] { true, false}) { + for(boolean isMultiStore: new boolean[] { true, false}) { + String script = (isMultiStore ? multiStoreScript : + singleStoreScript); + // since we will be switching between map red and local modes + // we will need to make sure filelocalizer is reset before each + // run. + FileLocalizer.setInitialized(false); + if(execType == ExecType.MAPREDUCE) { + ps = new PigServer(ExecType.MAPREDUCE, + cluster.getProperties()); + } else { + Properties props = new Properties(); + props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); + ps = new PigServer(ExecType.LOCAL, props); + } + ps.getPigContext().getProperties().setProperty( + MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + Boolean.toString(isPropertySet)); + cleanupFiles(ps, files); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { + String sucFile = outputFileName + "_" + i + "/" + + MapReduceLauncher.SUCCEEDED_FILE_NAME; + assertEquals("Checking if _SUCCESS file exists in " + + execType + " mode", isPropertySet, + Util.exists(ps.getPigContext(), sucFile)); + } + } + } + } + } finally { + cleanupFiles(ps, files); + } + } + + // Test _SUCCESS file is NOT created when job fails and when + // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true + // The test covers multi store and single store case in local and mapreduce mode + // The test also checks that "_SUCCESS" file is NOT created when the property + // is not set to true in all the modes. + @Test + public void testSuccessFileCreation2() throws Exception { + PigServer ps = null; + String[] files = new String[] { inputFileName, + outputFileName + "_1", outputFileName + "_2", outputFileName + "_3"}; + try { + ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; + String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; + System.err.println("XXX: " + TestStore.FailUDF.class.getName()); + String multiStoreScript = "a = load '"+ inputFileName + "';" + + "b = filter a by $0 == 'hello';" + + "b = foreach b generate " + FAIL_UDF_NAME + "($0);" + + "c = filter a by $0 == 'hi';" + + "d = filter a by $0 == 'bye';" + + "store b into '" + outputFileName + "_1';" + + "store c into '" + outputFileName + "_2';" + + "store d into '" + outputFileName + "_3';"; + + String singleStoreScript = "a = load '"+ inputFileName + "';" + + "b = foreach a generate " + FAIL_UDF_NAME + "($0);" + + "store b into '" + outputFileName + "_1';" ; + + for (ExecType execType : modes) { + for(boolean isPropertySet: new boolean[] { true, false}) { + for(boolean isMultiStore: new boolean[] { true, false}) { + String script = (isMultiStore ? multiStoreScript : + singleStoreScript); + // since we will be switching between map red and local modes + // we will need to make sure filelocalizer is reset before each + // run. + FileLocalizer.setInitialized(false); + if(execType == ExecType.MAPREDUCE) { + // since the job is guaranteed to fail, let's set + // number of retries to 1. + Properties props = cluster.getProperties(); + props.setProperty(MAP_MAX_ATTEMPTS, "1"); + ps = new PigServer(ExecType.MAPREDUCE, props); + } else { + Properties props = new Properties(); + props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); + // since the job is guaranteed to fail, let's set + // number of retries to 1. + props.setProperty(MAP_MAX_ATTEMPTS, "1"); + ps = new PigServer(ExecType.LOCAL, props); + } + ps.getPigContext().getProperties().setProperty( + MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + Boolean.toString(isPropertySet)); + cleanupFiles(ps, files); + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + try { + ps.executeBatch(); + } catch(IOException ioe) { + if(!ioe.getMessage().equals("FailUDFException")) { + // an unexpected exception + throw ioe; + } + } + for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { + String sucFile = outputFileName + "_" + i + "/" + + MapReduceLauncher.SUCCEEDED_FILE_NAME; + assertEquals("Checking if _SUCCESS file exists in " + + execType + " mode", false, + Util.exists(ps.getPigContext(), sucFile)); + } + } + } + } + } finally { + cleanupFiles(ps, files); + } + } + + // A UDF which always throws an Exception so that the job can fail + public static class FailUDF extends EvalFunc { + + @Override + public String exec(Tuple input) throws IOException { + throw new IOException("FailUDFException"); + } + + } + private void cleanupFiles(PigServer ps, String... files) throws IOException { + for(String file:files) { + Util.deleteFile(ps.getPigContext(), file); + } + } + public static class DummyStore extends PigStorage implements StoreMetadata{ @@ -622,7 +791,6 @@ public class TestStore extends junit.fra @Override public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException { - // TODO Auto-generated method stub } }