pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aniket...@apache.org
Subject svn commit: r1564091 - in /pig/trunk: ./ conf/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/io/ test/org/a...
Date Mon, 03 Feb 2014 21:57:55 GMT
Author: aniket486
Date: Mon Feb  3 21:57:55 2014
New Revision: 1564091

URL: http://svn.apache.org/r1564091
Log:
PIG-2672: Optimize the use of DistributedCache (aniket486)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    pig/trunk/test/org/apache/pig/test/TestPigServer.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Feb  3 21:57:55 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-2672: Optimize the use of DistributedCache (aniket486)
+
 PIG-3238: Pig current releases lack a UDF Stuff(). This UDF deletes a specified length of
characters
  and inserts another set of characters at a specified starting point (nezihyigitbasi via
daijy)
 

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Mon Feb  3 21:57:55 2014
@@ -247,3 +247,11 @@ pig.location.check.strict=false
 # When enabled, jobs won't create empty part files if no output is written. In this case
 # PigOutputFormat will be wrapped with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat.
 # pig.output.lazy=true
+
+# Set this option to turn on additional jar caching for the user
+# pig.user.cache.enabled=true
+
+# This option defines location where additional jars are cached for the user.
+# Additional jar will be cached under PIG_USER_CACHE_LOCATION/${user.name}/.pigcache
+# and will be re-used across the jobs run by the user if the jar has not changed.
+# pig.user.cache.location=/tmp

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
(original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Mon Feb  3 21:57:55 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
@@ -190,7 +191,7 @@ public class TestAvroStorage {
     public static void setup() throws ExecException, IOException {
         pigServerLocal = new PigServer(ExecType.LOCAL);
         String TMP_DIR = System.getProperty("user.dir") + "/build/test/tmp/";
-        pigServerLocal.getPigContext().getProperties().setProperty("pig.temp.dir", TMP_DIR);
+        pigServerLocal.getPigContext().getProperties().setProperty(PigConfiguration.PIG_TEMP_DIR,
TMP_DIR);
         outbasedir = FileLocalizer.getTemporaryPath(pigServerLocal.getPigContext()).toString()
+ "/TestAvroStorage/";
         deleteDirectory(new File(outbasedir));
     }

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Feb  3 21:57:55 2014
@@ -148,12 +148,11 @@ public class PigConfiguration {
      * Controls the max threshold size to convert jobs to run in local mode
      */
     public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
-    
+
     /**
      * This parameter enables/disables fetching. By default it is turned on.
      */
     public static final String OPT_FETCH = "opt.fetch";
-    
 
     /**
      * This key is used to define whether PigOutputFormat will be wrapped with LazyOutputFormat
@@ -161,5 +160,21 @@ public class PigConfiguration {
      */
     public static final String PIG_OUTPUT_LAZY = "pig.output.lazy";
 
+    /**
+     * Location where pig stores temporary files for job setup
+     */
+    public static final String PIG_TEMP_DIR = "pig.temp.dir";
+
+    /**
+     * This key is turn on the user level cache
+     */
+    public static final String PIG_USER_CACHE_ENABLED = "pig.user.cache.enabled";
+
+    /**
+     * Location where additional jars are cached for the user
+     * Additional jar will be cached under PIG_USER_CACHE_LOCATION/${user.name}/.pigcache
+     * and will be re-used across the jobs run by the user if the jar has not changed
+     */
+    public static final String PIG_USER_CACHE_LOCATION = "pig.user.cache.location";
 }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Mon Feb  3 21:57:55 2014
@@ -38,6 +38,8 @@ import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +65,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
@@ -1589,6 +1592,57 @@ public class JobControlCompiler{
         pigContext.skipJars.add(url.getPath());
     }
 
+    private static Path getCacheStagingDir(Configuration conf) throws IOException {
+        String pigTempDir = conf.get(PigConfiguration.PIG_USER_CACHE_LOCATION,
+                conf.get(PigConfiguration.PIG_TEMP_DIR, "/tmp"));
+        String currentUser = System.getProperty("user.name");
+        Path stagingDir = new Path(pigTempDir + "/" + currentUser + "/", ".pigcache");
+        FileSystem fs = FileSystem.get(conf);
+        fs.mkdirs(stagingDir);
+        fs.setPermission(stagingDir, FileLocalizer.OWNER_ONLY_PERMS);
+        return stagingDir;
+    }
+
+    private static Path getFromCache(PigContext pigContext,
+            Configuration conf,
+            URL url) throws IOException {
+        try {
+            Path stagingDir = getCacheStagingDir(conf);
+            String filename = FilenameUtils.getName(url.getPath());
+
+            String checksum = DigestUtils.shaHex(url.openStream());
+            FileSystem fs = FileSystem.get(conf);
+            Path cacheDir = new Path(stagingDir, checksum);
+            FileStatus [] statuses = fs.listStatus(cacheDir);
+            if (statuses != null) {
+                for (FileStatus stat : statuses) {
+                    Path jarPath = stat.getPath();
+                    if(jarPath.getName().equals(filename)) {
+                        log.info("Found " + url + " in jar cache at "+ stagingDir);
+                        long curTime = System.currentTimeMillis();
+                        fs.setTimes(jarPath, -1, curTime);
+                        return jarPath;
+                    }
+                }
+            }
+            log.info("Url "+ url + " was not found in jarcache at "+ stagingDir);
+            // attempt to copy to cache else return null
+            fs.mkdirs(cacheDir, FileLocalizer.OWNER_ONLY_PERMS);
+            Path cacheFile = new Path(cacheDir, filename);
+            OutputStream os = FileSystem.create(fs, cacheFile, FileLocalizer.OWNER_ONLY_PERMS);
+            try {
+                IOUtils.copyBytes(url.openStream(), os, 4096, true);
+            } finally {
+                os.close();
+            }
+            return cacheFile;
+
+        } catch (IOException ioe) {
+            log.info("Unable to retrieve jar from jar cache ", ioe);
+            return null;
+        }
+    }
+
     /**
      * copy the file to hdfs in a temporary path
      * @param pigContext the pig context
@@ -1602,9 +1656,15 @@ public class JobControlCompiler{
             Configuration conf,
             URL url) throws IOException {
 
-        String path = url.getPath();
-        int slash = path.lastIndexOf("/");
-        String suffix = slash == -1 ? path : path.substring(slash+1);
+        boolean cacheEnabled =
+                conf.getBoolean(PigConfiguration.PIG_USER_CACHE_ENABLED, false);
+        if (cacheEnabled) {
+            Path pathOnDfs = getFromCache(pigContext, conf, url);
+            if(pathOnDfs != null) {
+                return pathOnDfs;
+            }
+        }
+        String suffix = FilenameUtils.getName(url.getPath());
 
         Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(),
suffix);
         FileSystem fs = dst.getFileSystem(conf);

Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Mon Feb  3 21:57:55 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Shell;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -67,7 +68,7 @@ public class FileLocalizer {
     static public final int STYLE_UNIX = 0;
     static public final int STYLE_WINDOWS = 1;
 
-    private static FsPermission rootPerm = new FsPermission(FsAction.ALL, FsAction.NONE,
+    public static FsPermission OWNER_ONLY_PERMS = new FsPermission(FsAction.ALL, FsAction.NONE,
             FsAction.NONE); // rwx------
 
     public static class DataStorageInputStreamIterator extends InputStream {
@@ -461,7 +462,7 @@ public class FileLocalizer {
             throws DataStorageException {
 
         if (relativeRoot.get() == null) {
-            String tdir= pigContext.getProperties().getProperty("pig.temp.dir", "/tmp");
+            String tdir= pigContext.getProperties().getProperty(PigConfiguration.PIG_TEMP_DIR,
"/tmp");
             ContainerDescriptor relative = pigContext.getDfs().asContainer(tdir + "/temp"
+ r.nextInt());
             relativeRoot.set(relative);
             try {
@@ -479,7 +480,7 @@ public class FileLocalizer {
     private static void createRelativeRoot(ContainerDescriptor relativeRoot) throws IOException
{
         relativeRoot.create();
         if (relativeRoot instanceof HDirectory) {
-            ((HDirectory) relativeRoot).setPermission(rootPerm);
+            ((HDirectory) relativeRoot).setPermission(OWNER_ONLY_PERMS);
         }
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Mon Feb  3 21:57:55 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
@@ -70,7 +71,7 @@ public class TestMultiQueryLocal {
         context.getProperties().setProperty("opt.multiquery", ""+true);
         myPig = new PigServer(context);
         myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false");
-        myPig.getPigContext().getProperties().setProperty("pig.temp.dir", "build/test/tmp/");
+        myPig.getPigContext().getProperties().setProperty(PigConfiguration.PIG_TEMP_DIR,
"build/test/tmp/");
         TMP_DIR = FileLocalizer.getTemporaryPath(myPig.getPigContext()).toUri().getPath();
         deleteOutputFiles();
     }

Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1564091&r1=1564090&r2=1564091&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Mon Feb  3 21:57:55 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -777,7 +778,7 @@ public class TestPigServer {
         File propertyFile = new File(tempDir, "pig.properties");
         propertyFile.deleteOnExit();
         PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
-        out.println("pig.temp.dir=/tmp/test");
+        out.println(PigConfiguration.PIG_TEMP_DIR + "=/tmp/test");
         out.close();
         Properties properties = PropertiesUtil.loadDefaultProperties();
         PigContext pigContext=new PigContext(ExecType.LOCAL, properties);



Mime
View raw message