Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 12161 invoked from network); 4 Feb 2010 00:59:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Feb 2010 00:59:32 -0000 Received: (qmail 9420 invoked by uid 500); 4 Feb 2010 00:59:32 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 9373 invoked by uid 500); 4 Feb 2010 00:59:32 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 9364 invoked by uid 500); 4 Feb 2010 00:59:32 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 9361 invoked by uid 99); 4 Feb 2010 00:59:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2010 00:59:32 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2010 00:59:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E992323888E7; Thu, 4 Feb 2010 00:59:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r906314 - in /hadoop/pig/branches/load-store-redesign: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/ Date: Thu, 04 Feb 2010 00:59:10 -0000 To: pig-commits@incubator.apache.org From: pradeepkth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100204005910.E992323888E7@eris.apache.org> Author: pradeepkth Date: Thu Feb 4 00:59:10 2010 New Revision: 906314 URL: http://svn.apache.org/viewvc?rev=906314&view=rev Log: PIG-1090: additional patch to handle calling setStoreSchema of StoreMetadata in local mode (pradeepkth) Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=906314&r1=906313&r2=906314&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Feb 4 00:59:10 2010 @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigWarning; @@ -282,6 +283,18 @@ for(Job job : succJobs){ List sts = jcc.getStores(job); for (POStore st: sts) { + // Currently (as of Feb 3 2010), hadoop's local mode does not + // call cleanupJob on OutputCommitter (see https://issues.apache.org/jira/browse/MAPREDUCE-1447) + // So to workaround that bug, we are calling setStoreSchema on + // StoreFunc's which implement StoreMetadata here + /**********************************************************/ + // NOTE: THE FOLLOWING IF SHOULD BE REMOVED ONCE MAPREDUCE-1447 + // IS FIXED - TestStore.testSetStoreSchema() should fail at + // that time and removing this code should fix it. + /**********************************************************/ + if(pc.getExecType() == ExecType.LOCAL) { + storeSchema(job, st); + } if (!st.isTmpStore()) { succeededStores.add(st.getSFile()); finalStores++; @@ -443,6 +456,19 @@ } /** + * @param job + * @param st + * @throws IOException + */ + private void storeSchema(Job job, POStore st) throws IOException { + JobContext jc = new JobContext(job.getJobConf(), + new org.apache.hadoop.mapreduce.JobID()); + JobContext updatedJc = PigOutputCommitter.setUpContext(jc, st); + PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration()); + } + + + /** * An exception handler class to handle exceptions thrown by the job controller thread * Its a local class. This is the only mechanism to catch unhandled thread exceptions * Unhandled exceptions in threads are handled by the VM if the handler is not registered Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=906314&r1=906313&r2=906314&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Feb 4 00:59:10 2010 @@ -136,7 +136,7 @@ return contextCopy; } - private JobContext setUpContext(JobContext context, + static JobContext setUpContext(JobContext context, POStore store) throws IOException { // make a copy of the context so that the actions after this call // do not end up updating the same context @@ -153,7 +153,7 @@ return contextCopy; } - private void storeCleanup(POStore store, Configuration conf) + static void storeCleanup(POStore store, Configuration conf) throws IOException { StoreFunc storeFunc = store.getStoreFunc(); if (storeFunc instanceof StoreMetadata) { Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=906314&r1=906313&r2=906314&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Thu Feb 4 00:59:10 2010 @@ -17,7 +17,6 @@ */ package org.apache.pig.test; -import java.io.File; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -25,17 +24,31 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; -import java.util.Set; -import java.util.Map.Entry; import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.StoreFunc; +import org.apache.pig.StoreMetadata; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.builtin.BinStorage; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataBag; @@ -44,12 +57,6 @@ import org.apache.pig.data.DefaultTuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; -import org.apache.pig.pen.physicalOperators.POCounter; -import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.impl.logicalLayer.LOStore; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; @@ -57,8 +64,10 @@ import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.pen.physicalOperators.POCounter; import org.apache.pig.test.utils.GenRandomData; import org.apache.pig.test.utils.TestHelper; +import org.apache.pig.tools.pigstats.PigStats; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -292,6 +301,110 @@ checkStorePath("/tmp/foo/../././","/tmp"); } + @Test + public void testSetStoreSchema() throws Exception { + PigServer ps = null; + String storeSchemaOutputFile = outputFileName + "_storeSchema_test"; + try { + ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL}; + String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; + + String script = "a = load '"+ inputFileName + "';" + + "store a into '" + outputFileName + "' using " + + DummyStore.class.getName() + "();"; + + for (ExecType execType : modes) { + if(execType == ExecType.MAPREDUCE) { + ps = new PigServer(ExecType.MAPREDUCE, + cluster.getProperties()); + Util.deleteFile(ps.getPigContext(), inputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile); + } else { + ps = new PigServer(ExecType.LOCAL); + Util.deleteFile(ps.getPigContext(), inputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile); + } + ps.setBatchOn(); + Util.createInputFile(ps.getPigContext(), + inputFileName, inputData); + Util.registerMultiLineQuery(ps, script); + ps.executeBatch(); + assertEquals( + "Checking if file indicating that storeSchema was " + + "called exists in " + execType + " mode", true, + Util.exists(ps.getPigContext(), storeSchemaOutputFile)); + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Exception encountered - hence failing:" + e); + } finally { + Util.deleteFile(ps.getPigContext(), inputFileName); + Util.deleteFile(ps.getPigContext(), outputFileName); + Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile); + } + } + + public static class DummyStore implements StoreFunc, StoreMetadata{ + + @Override + public void checkSchema(ResourceSchema s) throws IOException { + + } + + @Override + public OutputFormat getOutputFormat() throws IOException { + // we don't really write in the test - so this is just to keep + // Pig/hadoop happy + return new TextOutputFormat(); + } + + @Override + public void prepareToWrite(RecordWriter writer) throws IOException { + + } + + @Override + public void putNext(Tuple t) throws IOException { + // we don't really write anything out + } + + @Override + public String relToAbsPathForStoreLocation(String location, Path curDir) + throws IOException { + return location; + } + + @Override + public void setStoreFuncUDFContextSignature(String signature) { + + } + + @Override + public void setStoreLocation(String location, Job job) + throws IOException { + FileOutputFormat.setOutputPath(job, new Path(location)); + } + + @Override + public void storeSchema(ResourceSchema schema, String location, + Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + // create a file to test that this method got called - if it gets called + // multiple times, the create will throw an Exception + fs.create( + new Path(conf.get("mapred.output.dir") + "_storeSchema_test"), + false); + } + + @Override + public void storeStatistics(ResourceStatistics stats, String location, + Configuration conf) throws IOException { + } + + } + private void checkStorePath(String orig, String expected) throws Exception { checkStorePath(orig, expected, false); } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java?rev=906314&r1=906313&r2=906314&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java Thu Feb 4 00:59:10 2010 @@ -308,6 +308,14 @@ fs.delete(new Path(fileName), true); } + static public boolean exists(PigContext pigContext, String fileName) + throws IOException { + Configuration conf = ConfigurationUtil.toConfiguration( + pigContext.getProperties()); + FileSystem fs = FileSystem.get(conf); + return fs.exists(new Path(fileName)); + } + /** * Helper function to check if the result of a Pig Query is in line with * expected results.