pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1604668 - in /pig/trunk: ./ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/util/ src/org/apache/pig/scripting/ test/ test/e2e/pig/tests/
Date Sun, 22 Jun 2014 23:35:44 GMT
Author: daijy
Date: Sun Jun 22 23:35:43 2014
New Revision: 1604668

URL: http://svn.apache.org/r1604668
Log:
PIG-3478: Make StreamingUDF work for Hadoop 2

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
    pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/excluded-tests-23

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1604668&r1=1604667&r2=1604668&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jun 22 23:35:43 2014
@@ -40,6 +40,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3478: Make StreamingUDF work for Hadoop 2 (lbendig via daijy)
+
 PIG-4032: BloomFilter fails with s3 path in Hadoop 2.4 (cheolsoo)
 
 PIG-4018: Schema validation fails with UNION ONSCHEMA (daijy)

Modified: pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1604668&r1=1604667&r2=1604668&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java Sun Jun 22 23:35:43 2014
@@ -39,7 +39,6 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.ExecTypeProvider;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -173,7 +172,10 @@ public class StreamingUDF extends EvalFu
         String[] command = new String[10];
         Configuration conf = UDFContext.getUDFContext().getJobConf();
 
-        String jarPath = conf.get("mapred.jar");
+        String jarPath = conf.get("mapreduce.job.jar");
+        if (jarPath == null) {
+            jarPath = conf.get("mapred.jar");
+        }
         String jobDir;
         if (jarPath != null) {
             jobDir = new File(jarPath).getParent();
@@ -204,14 +206,66 @@ public class StreamingUDF extends EvalFu
                 "." : 
                 filePath.substring(0, lastSeparator - 1);
         command[UDF_NAME] = funcName;
-        command[PATH_TO_FILE_CACHE] = "\"" + jobDir + filePath.substring(0, lastSeparator)
+ "\"";
+        String fileCachePath = jobDir + filePath.substring(0, lastSeparator);
+        command[PATH_TO_FILE_CACHE] = "\"" + fileCachePath + "\"";
         command[STD_OUT_OUTPUT_PATH] = outFileName;
         command[STD_ERR_OUTPUT_PATH] = errOutFileName;
         command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
         command[IS_ILLUSTRATE] = isIllustrate;
+        
+        ensureUserFileAvailable(command, fileCachePath);
+        
         return command;
     }
 
+    /**
+     * Need to make sure the user's file is available. If jar hasn't been
+     * exploded, just copy the udf file to its path relative to the controller
+     * file and update file cache path appropriately.
+     */
+    private void ensureUserFileAvailable(String[] command, String fileCachePath)
+            throws ExecException, IOException {
+
+        File userUdfFile = new File(fileCachePath + command[UDF_FILE_NAME] + getUserFileExtension());
+        if (!userUdfFile.exists()) {
+            String absolutePath = filePath.startsWith("/") ? filePath : File.separator +
filePath;
+            String controllerDir = new File(command[PATH_TO_CONTROLLER_FILE]).getParent();
+            String userUdfPath = controllerDir + absolutePath + getUserFileExtension();
+            userUdfFile = new File(userUdfPath);
+            userUdfFile.deleteOnExit();
+            userUdfFile.getParentFile().mkdirs();
+            if (userUdfFile.exists()) {
+                userUdfFile.delete();
+                if (!userUdfFile.createNewFile()) {
+                    throw new IOException("Unable to create file: " + userUdfFile.getAbsolutePath());
+                }
+            }
+            InputStream udfFileStream = this.getClass().getResourceAsStream(
+                    absolutePath + getUserFileExtension());
+            command[PATH_TO_FILE_CACHE] = "\"" + userUdfFile.getParentFile().getAbsolutePath()
+                    + "\"";
+
+            try {
+                FileUtils.copyInputStreamToFile(udfFileStream, userUdfFile);
+            }
+            catch (Exception e) {
+                throw new ExecException("Unable to copy user udf file: " + userUdfFile.getName(),
e);
+            }
+            finally {
+                udfFileStream.close();
+            }
+        }
+    }
+
+    private String getUserFileExtension() throws ExecException {
+        if (isPython()) {
+            return ".py";
+        }
+        else {
+            throw new ExecException("Unrecognized streamingUDF language: " + language);
+        }
+    }
+    
     private void createInputHandlers() throws ExecException, FrontendException {
         PigStreamingUDF serializer = new PigStreamingUDF();
         this.inputHandler = new StreamingUDFInputHandler(serializer);
@@ -257,13 +311,12 @@ public class StreamingUDF extends EvalFu
      * @throws IOException
      */
     private String getControllerPath(String jarPath) throws IOException {
-        if (language.toLowerCase().equals("python")) {
+        if (isPython()) {
             String controllerPath = jarPath + PYTHON_CONTROLLER_JAR_PATH;
             File controller = new File(controllerPath);
             if (!controller.exists()) {
                 File controllerFile = File.createTempFile("controller", ".py");
-                InputStream pythonControllerStream = Launcher.class.getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH);
-                try {
+                InputStream pythonControllerStream = this.getClass().getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH);
               try {
                     FileUtils.copyInputStreamToFile(pythonControllerStream, controllerFile);
                 } finally {
                     pythonControllerStream.close();
@@ -271,7 +324,7 @@ public class StreamingUDF extends EvalFu
                 controllerFile.deleteOnExit();
                 File pigUtilFile = new File(controllerFile.getParent() + "/pig_util.py");
                 pigUtilFile.deleteOnExit();
-                InputStream pythonUtilStream = Launcher.class.getResourceAsStream(PYTHON_PIG_UTIL_PATH);
+                InputStream pythonUtilStream = this.getClass().getResourceAsStream(PYTHON_PIG_UTIL_PATH);
                 try {
                     FileUtils.copyInputStreamToFile(pythonUtilStream, pigUtilFile);
                 } finally {
@@ -285,6 +338,10 @@ public class StreamingUDF extends EvalFu
         }
     }
 
+    private boolean isPython() {
+        return language.toLowerCase().equals("python");
+    }
+    
     /**
      * Returns a list of file names (relative to root of pig jar) of files that need to be
      * included in the jar shipped to the cluster.
@@ -465,7 +522,7 @@ public class StreamingUDF extends EvalFu
                     stderr = null;
                 }
             } catch (IOException e) {
-                log.debug("Process Ended");
+                log.debug("Process Ended", e);
             } catch (Exception e) {
                 log.error("standard error problem", e);
             }

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1604668&r1=1604667&r2=1604668&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Sun Jun 22 23:35:43 2014
@@ -49,7 +49,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.StreamingUDF;
@@ -193,8 +192,8 @@ public class JarManager {
                 
                 if (clazz.getSimpleName().equals("StreamingUDF")) {
                     for (String fileName : StreamingUDF.getResourcesForJar()) {
-                        InputStream in = Launcher.class.getResourceAsStream(fileName);
-                        addStream(jarFile, fileName, in, contents);
+                        InputStream in = JarManager.class.getResourceAsStream(fileName);
+                        addStream(jarFile, fileName.substring(1), in, contents);
                     }
                 }
             }

Modified: pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1604668&r1=1604667&r2=1604668&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/ScriptingOutputCapturer.java Sun Jun 22 23:35:43
2014
@@ -60,13 +60,38 @@ public class ScriptingOutputCapturer {
         this.execType = execType;
     }
 
-    public String getStandardOutputRootWriteLocation() {
+    public String getStandardOutputRootWriteLocation() throws IOException {
         Configuration conf = UDFContext.getUDFContext().getJobConf();
         
         String jobId = conf.get("mapred.job.id");
         String taskId = conf.get("mapred.task.id");
+        String hadoopLogDir = System.getProperty("hadoop.log.dir");
+        if (hadoopLogDir == null) {
+            hadoopLogDir = conf.get("hadoop.log.dir");
+        }
+        
+        String tmpDir = conf.get("hadoop.tmp.dir");
+        boolean fallbackToTmp = (hadoopLogDir == null);
+        if (!fallbackToTmp) {
+            try {
+                if (!(new File(hadoopLogDir).canWrite())) {
+                    fallbackToTmp = true;
+                }
+            }
+            catch (SecurityException e) {
+                fallbackToTmp = true;
+            }
+            finally {
+                if (fallbackToTmp)
+                    log.warn(String.format("Insufficient permission to write into %s. Change
path to: %s", hadoopLogDir, tmpDir));
+            }
+        }
+        if (fallbackToTmp) {
+            hadoopLogDir = tmpDir;
+        }
         log.debug("JobId: " + jobId);
         log.debug("TaskId: " + taskId);
+        log.debug("hadoopLogDir: " + hadoopLogDir);
 
         if (execType.isLocal()) {
             String logDir = System.getProperty("pig.udf.scripting.log.dir");
@@ -74,14 +99,13 @@ public class ScriptingOutputCapturer {
                 logDir = ".";
             return logDir + "/" + (taskId == null ? "" : (taskId + "_"));
         } else {
-            String taskLogDir = getTaskLogDir(jobId, taskId);
+            String taskLogDir = getTaskLogDir(jobId, taskId, hadoopLogDir);
             return taskLogDir + "/";
         }
     }
 
-    public String getTaskLogDir(String jobId, String taskId) {
+    public String getTaskLogDir(String jobId, String taskId, String hadoopLogDir) throws
IOException {
         String taskLogDir;
-        String hadoopLogDir = System.getProperty("hadoop.log.dir");
         String defaultUserLogDir = hadoopLogDir + File.separator + "userlogs";
 
         if ( new File(defaultUserLogDir + File.separator + jobId).exists() ) {
@@ -92,6 +116,11 @@ public class ScriptingOutputCapturer {
             taskLogDir = defaultUserLogDir;
         } else {
             taskLogDir = hadoopLogDir + File.separator + "udfOutput";
+            File dir = new File(taskLogDir);
+            dir.mkdirs();
+            if (!dir.exists()) {
+                throw new IOException("Could not create directory: " + taskLogDir);
+            }
         }
         return taskLogDir;
     }

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1604668&r1=1604667&r2=1604668&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Sun Jun 22 23:35:43 2014
@@ -3806,7 +3806,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test integer square
                     'num' => 1,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
@@ -3820,7 +3819,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test string concat and referencing function without a namespace
                     'num' => 2,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age,
gpa);
@@ -3834,7 +3832,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test long and float square, plus two references to the same UDF with
different schemas
                     'num' => 3,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'floatpostprocess' => 1,
                     'delimiter' => '    ',
                     'pig' => q\
@@ -3850,7 +3847,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test bytearray
                     'num' => 4,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age,
gpa);
@@ -3864,7 +3860,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test complex types
                     'num' => 5,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studentcomplextab10k' using PigStorage() as (m:[], t:(name:chararray,
age:int, gpa:double), b:{t:(name:chararray, age:int, gpa:double)});
@@ -3879,7 +3874,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test null input and output
                     'num' => 6,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studentnulltab10k' using PigStorage() as (name, age:int, gpa:double);
@@ -3893,7 +3887,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test functions that call other functions and include other files
                     'num' => 7,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
@@ -3907,7 +3900,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test that functions with same names resolve correctly across name spaces
                     'num' => 8,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 register ':SCRIPTHOMEPATH:/cpython/morepythonudfs.py' using streaming_python as morefuncs;
@@ -3922,7 +3914,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test that functions with same names resolve correctly across name spaces
                     'num' => 9,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
@@ -3938,7 +3929,6 @@ store c into ':OUTPATH:';\,
                     {
                     # test that functions with same names resolve correctly across name spaces
                     'num' => 10,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/allscalar10k' as (name:chararray, age:int, gpa:double, instate:boolean);
@@ -3952,7 +3942,6 @@ store b into ':OUTPATH:';\,
                     {
                     # test that functions with same names resolve correctly across name spaces
                     'num' => 11,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
@@ -3966,7 +3955,6 @@ store b into ':OUTPATH:';\,
                     {
                     # udf which returns an array
                     'num' => 12,
-                    'ignore23' => 'Streaming UDF does not work for Hadoop 2. See PIG-3478',
                     'pig' => q\
 register ':SCRIPTHOMEPATH:/cpython/scriptingudf.py' using streaming_python as myfuncs;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray);

Modified: pig/trunk/test/excluded-tests-23
URL: http://svn.apache.org/viewvc/pig/trunk/test/excluded-tests-23?rev=1604668&r1=1604667&r2=1604668&view=diff
==============================================================================
--- pig/trunk/test/excluded-tests-23 (original)
+++ pig/trunk/test/excluded-tests-23 Sun Jun 22 23:35:43 2014
@@ -1,2 +1 @@
 **/TestHBaseStorage.java
-**/TestStreamingUDF.java



Mime
View raw message