pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep...@apache.org
Subject svn commit: r906314 - in /hadoop/pig/branches/load-store-redesign: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/
Date Thu, 04 Feb 2010 00:59:10 GMT
Author: pradeepkth
Date: Thu Feb  4 00:59:10 2010
New Revision: 906314

URL: http://svn.apache.org/viewvc?rev=906314&view=rev
Log:
PIG-1090: additional patch to handle calling setStoreSchema of StoreMetadata in local mode
(pradeepkth)

Modified:
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Thu Feb  4 00:59:10 2010
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
@@ -282,6 +283,18 @@
             for(Job job : succJobs){
                 List<POStore> sts = jcc.getStores(job);
                 for (POStore st: sts) {
+                    // Currently (as of Feb 3 2010), hadoop's local mode does not
+                    // call cleanupJob on OutputCommitter (see https://issues.apache.org/jira/browse/MAPREDUCE-1447)
+                    // So to workaround that bug, we are calling setStoreSchema on
+                    // StoreFunc's which implement StoreMetadata here
+                    /**********************************************************/
+                    // NOTE: THE FOLLOWING IF SHOULD BE REMOVED ONCE MAPREDUCE-1447
+                    // IS FIXED - TestStore.testSetStoreSchema() should fail at
+                    // that time and removing this code should fix it.
+                    /**********************************************************/
+                    if(pc.getExecType() == ExecType.LOCAL) {
+                        storeSchema(job, st);
+                    }
                     if (!st.isTmpStore()) {
                         succeededStores.add(st.getSFile());
                         finalStores++;
@@ -443,6 +456,19 @@
     }
     
     /**
+     * @param job
+     * @param st
+     * @throws IOException 
+     */
+    private void storeSchema(Job job, POStore st) throws IOException {
+        JobContext jc = new JobContext(job.getJobConf(), 
+                new org.apache.hadoop.mapreduce.JobID());
+        JobContext updatedJc = PigOutputCommitter.setUpContext(jc, st);
+        PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());
+    }
+
+    
+    /**
      * An exception handler class to handle exceptions thrown by the job controller thread
      * Its a local class. This is the only mechanism to catch unhandled thread exceptions
      * Unhandled exceptions in threads are handled by the VM if the handler is not registered

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
(original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
Thu Feb  4 00:59:10 2010
@@ -136,7 +136,7 @@
         return contextCopy;   
     }
     
-    private JobContext setUpContext(JobContext context, 
+    static JobContext setUpContext(JobContext context, 
             POStore store) throws IOException {
         // make a copy of the context so that the actions after this call
         // do not end up updating the same context
@@ -153,7 +153,7 @@
         return contextCopy;   
     }
 
-    private void storeCleanup(POStore store, Configuration conf)
+    static void storeCleanup(POStore store, Configuration conf)
             throws IOException {
         StoreFunc storeFunc = store.getStoreFunc();
         if (storeFunc instanceof StoreMetadata) {

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Thu Feb
 4 00:59:10 2010
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.test;
 
-import java.io.File;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -25,17 +24,31 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
-import java.util.Map.Entry;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
@@ -44,12 +57,6 @@
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.pen.physicalOperators.POCounter;
-import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -57,8 +64,10 @@
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.pen.physicalOperators.POCounter;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -292,6 +301,110 @@
         checkStorePath("/tmp/foo/../././","/tmp");
     }
 
+    @Test
+    public void testSetStoreSchema() throws Exception {
+        PigServer ps = null;
+        String storeSchemaOutputFile = outputFileName + "_storeSchema_test";
+        try {
+            ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL};
+            String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+            
+            String script = "a = load '"+ inputFileName + "';" +
+            		"store a into '" + outputFileName + "' using " + 
+            		DummyStore.class.getName() + "();";
+            
+            for (ExecType execType : modes) {
+                if(execType == ExecType.MAPREDUCE) {
+                    ps = new PigServer(ExecType.MAPREDUCE, 
+                            cluster.getProperties());
+                    Util.deleteFile(ps.getPigContext(), inputFileName);
+                    Util.deleteFile(ps.getPigContext(), outputFileName);
+                    Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+                } else {
+                    ps = new PigServer(ExecType.LOCAL);
+                    Util.deleteFile(ps.getPigContext(), inputFileName);
+                    Util.deleteFile(ps.getPigContext(), outputFileName);
+                    Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+                }
+                ps.setBatchOn();
+                Util.createInputFile(ps.getPigContext(), 
+                        inputFileName, inputData);
+                Util.registerMultiLineQuery(ps, script);
+                ps.executeBatch();
+                assertEquals(
+                        "Checking if file indicating that storeSchema was " +
+                        "called exists in " + execType + " mode", true, 
+                        Util.exists(ps.getPigContext(), storeSchemaOutputFile));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail("Exception encountered - hence failing:" + e);
+        } finally {
+            Util.deleteFile(ps.getPigContext(), inputFileName);
+            Util.deleteFile(ps.getPigContext(), outputFileName);
+            Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+        }
+    }
+    
+    public static class DummyStore implements StoreFunc, StoreMetadata{
+
+        @Override
+        public void checkSchema(ResourceSchema s) throws IOException {
+            
+        }
+
+        @Override
+        public OutputFormat getOutputFormat() throws IOException {
+            // we don't really write in the test - so this is just to keep
+            // Pig/hadoop happy
+            return new TextOutputFormat<Long, Text>();
+        }
+
+        @Override
+        public void prepareToWrite(RecordWriter writer) throws IOException {
+            
+        }
+
+        @Override
+        public void putNext(Tuple t) throws IOException {
+            // we don't really write anything out            
+        }
+
+        @Override
+        public String relToAbsPathForStoreLocation(String location, Path curDir)
+                throws IOException {
+            return location;
+        }
+
+        @Override
+        public void setStoreFuncUDFContextSignature(String signature) {
+            
+        }
+
+        @Override
+        public void setStoreLocation(String location, Job job)
+                throws IOException {
+            FileOutputFormat.setOutputPath(job, new Path(location));
+        }
+
+        @Override
+        public void storeSchema(ResourceSchema schema, String location,
+                Configuration conf) throws IOException {
+            FileSystem fs = FileSystem.get(conf);
+            // create a file to test that this method got called - if it gets called
+            // multiple times, the create will throw an Exception
+            fs.create(
+                    new Path(conf.get("mapred.output.dir") + "_storeSchema_test"),
+                    false);
+        }
+
+        @Override
+        public void storeStatistics(ResourceStatistics stats, String location,
+                Configuration conf) throws IOException {
+        }
+        
+    }
+    
     private void checkStorePath(String orig, String expected) throws Exception {
         checkStorePath(orig, expected, false);
     }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java Thu Feb  4
00:59:10 2010
@@ -308,6 +308,14 @@
         fs.delete(new Path(fileName), true);
     }
     
+    static public boolean exists(PigContext pigContext, String fileName) 
+    throws IOException {
+        Configuration conf = ConfigurationUtil.toConfiguration(
+                pigContext.getProperties());
+        FileSystem fs = FileSystem.get(conf);
+        return fs.exists(new Path(fileName));
+    }
+    
     /**
     * Helper function to check if the result of a Pig Query is in line with 
     * expected results.



Mime
View raw message