pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1599356 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ test/org/apache/pig/tez/
Date Mon, 02 Jun 2014 21:12:44 GMT
Author: daijy
Date: Mon Jun  2 21:12:43 2014
New Revision: 1599356

URL: http://svn.apache.org/r1599356
Log:
PIG-3978: Container reuse does not across PigServer

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
    pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
    pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun  2 21:12:43 2014
@@ -30,6 +30,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3978: Container reuse does not across PigServer (daijy)
+
 PIG-3974: E2E test data generation fails in cluster mode (lbendig via cheolsoo)
 
 PIG-3969: Javascript UDF fails if no output schema is defined (lbendig via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Jun
 2 21:12:43 2014
@@ -149,8 +149,6 @@ public class TezCompiler extends PhyPlan
 
     private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
 
-    private TezResourceManager tezResourceManager;
-
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
     public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
     public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -160,16 +158,15 @@ public class TezCompiler extends PhyPlan
 
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
-    public TezCompiler(PhysicalPlan plan, PigContext pigContext, TezResourceManager tezResourceManager)
+    public TezCompiler(PhysicalPlan plan, PigContext pigContext)
             throws TezCompilerException {
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         this.plan = plan;
         this.pigContext = pigContext;
-        this.tezResourceManager = tezResourceManager;
 
         pigProperties = pigContext.getProperties();
         splitsSeen = Maps.newHashMap();
-        tezPlan = new TezOperPlan(tezResourceManager);
+        tezPlan = new TezOperPlan();
         nig = NodeIdGenerator.getGenerator();
         udfFinder = new UDFFinder();
         List<PhysicalOperator> roots = plan.getRoots();
@@ -196,7 +193,7 @@ public class TezCompiler extends PhyPlan
 
     // Segment a single DAG into a DAG graph
     public TezPlanContainer getPlanContainer() throws PlanException {
-        TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext, tezResourceManager);
+        TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
         TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope),
tezPlan);
         tezPlanContainer.add(node);
         tezPlanContainer.split(node);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Jun
 2 21:12:43 2014
@@ -65,8 +65,10 @@ public class TezLauncher extends Launche
 
         Path stagingDir = FileLocalizer.getTemporaryPath(pc, "-tez");
 
-        TezResourceManager tezResourceManager = new TezResourceManager(stagingDir, pc, conf);
+        TezResourceManager tezResourceManager = TezResourceManager.getInstance();
+        tezResourceManager.init(pc, conf);
 
+        stagingDir.getFileSystem(conf).mkdirs(stagingDir);
         log.info("Tez staging directory is " + stagingDir.toString());
         conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
 
@@ -77,7 +79,7 @@ public class TezLauncher extends Launche
         PigStats.start(tezStats);
 
         TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
-        TezPlanContainer tezPlanContainer = compile(php, pc, tezResourceManager);
+        TezPlanContainer tezPlanContainer = compile(php, pc);
 
         TezOperPlan tezPlan;
 
@@ -205,7 +207,7 @@ public class TezLauncher extends Launche
             String format, boolean verbose) throws PlanException,
             VisitorException, IOException {
         log.debug("Entering TezLauncher.explain");
-        TezPlanContainer tezPlanContainer = compile(php, pc, null);
+        TezPlanContainer tezPlanContainer = compile(php, pc);
 
         if (format.equals("text")) {
             TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
@@ -217,9 +219,9 @@ public class TezLauncher extends Launche
         }
     }
 
-    public TezPlanContainer compile(PhysicalPlan php, PigContext pc, TezResourceManager tezResourceManager)
+    public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
             throws PlanException, IOException, VisitorException {
-        TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
+        TezCompiler comp = new TezCompiler(php, pc);
         TezOperPlan tezPlan = comp.compile();
 
         NoopFilterRemover filter = new NoopFilterRemover(tezPlan);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Mon Jun
 2 21:12:43 2014
@@ -42,12 +42,9 @@ public class TezOperPlan extends Operato
 
     private static final long serialVersionUID = 1L;
 
-    private TezResourceManager tezResourceManager;
-
     private Map<String, Path> extraResources = new HashMap<String, Path>();
 
-    public TezOperPlan(TezResourceManager tezResourceManager) {
-        this.tezResourceManager = tezResourceManager;
+    public TezOperPlan() {
     }
 
     @Override
@@ -70,7 +67,7 @@ public class TezOperPlan extends Operato
         String resourceName = resourcePath.getName();
 
         if (!extraResources.containsKey(resourceName)) {
-            Path remoteFsPath = tezResourceManager.addTezResource(url);
+            Path remoteFsPath = TezResourceManager.getInstance().addTezResource(url);
             extraResources.put(resourceName, remoteFsPath);
         }
     }
@@ -78,7 +75,7 @@ public class TezOperPlan extends Operato
     // Add extra plan-specific local resources already present in the remote FS
     public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException
{
         if (!extraResources.containsKey(resourceName)) {
-            tezResourceManager.addTezResource(resourceName, remoteFsPath);
+            TezResourceManager.getInstance().addTezResource(resourceName, remoteFsPath);
             extraResources.put(resourceName, remoteFsPath);
         }
     }
@@ -93,7 +90,7 @@ public class TezOperPlan extends Operato
         addShipResources(streamVisitor.getShipFiles());
         addCacheResources(streamVisitor.getCacheFiles());
 
-        return tezResourceManager.getTezResources(extraResources.keySet());
+        return TezResourceManager.getInstance().getTezResources(extraResources.keySet());
     }
 
     // In the statement "SHIP('/home/foo')" we'll map the resource name foo to

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
Mon Jun  2 21:12:43 2014
@@ -38,12 +38,10 @@ import org.apache.pig.impl.util.JarManag
 
 public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
     private static final long serialVersionUID = 1L;
-    private TezResourceManager tezResourceManager;
     private PigContext pigContext;
 
-    public TezPlanContainer(PigContext pigContext, TezResourceManager tezResourceManager)
{
+    public TezPlanContainer(PigContext pigContext) {
         this.pigContext = pigContext;
-        this.tezResourceManager = tezResourceManager;
     }
 
     // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
@@ -52,7 +50,7 @@ public class TezPlanContainer extends Op
     public Map<String, LocalResource> getLocalResources() throws Exception {
         Set<URL> jarLists = new HashSet<URL>();
 
-        jarLists.add(tezResourceManager.getBootStrapJar());
+        jarLists.add(new File(TezResourceManager.getInstance().getBootStrapJar()).toURI().toURL());
 
         // In MR Pig the extra jars and script jars get put in Distributed Cache, but
         // in Tez we'll add them as local resources.
@@ -101,7 +99,7 @@ public class TezPlanContainer extends Op
         //     }
         // }
 
-        return tezResourceManager.addTezResources(jarLists);
+        return TezResourceManager.getInstance().addTezResources(jarLists);
     }
 
     public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
@@ -157,7 +155,7 @@ public class TezPlanContainer extends Op
         if (operToSegment != null) {
             for (TezOperator succ : succs) {
                 tezOperPlan.disconnect(operToSegment, succ);
-                TezOperPlan newOperPlan = new TezOperPlan(tezResourceManager);
+                TezOperPlan newOperPlan = new TezOperPlan();
                 List<TezPlanContainerNode> containerSuccs = new ArrayList<TezPlanContainerNode>();
                 if (getSuccessors(planNode)!=null) {
                     containerSuccs.addAll(getSuccessors(planNode));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
Mon Jun  2 21:12:43 2014
@@ -35,46 +35,60 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.util.JarManager;
 
 public class TezResourceManager {
+    private static TezResourceManager instance = null;
+    private boolean inited = false;
     private Path stagingDir;
     private PigContext pigContext;
     private Configuration conf;
-    private URL bootStrapJar;
+    private String bootStrapJar;
     private FileSystem remoteFs;
     public Map<String, Path> resources = new HashMap<String, Path>();
 
-    public URL getBootStrapJar() {
+    public String getBootStrapJar() {
         return bootStrapJar;
     }
 
-    public TezResourceManager(Path stagingDir, PigContext pigContext, Configuration conf)
throws IOException {
-        resources.clear();
-        this.stagingDir = stagingDir;
-        this.pigContext = pigContext;
-        this.conf = conf;
-        String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
-        this.bootStrapJar = new File(jar).toURI().toURL();
-        remoteFs = FileSystem.get(conf);
-        addBootStrapJar();
+    static public TezResourceManager getInstance() {
+        if (instance==null) {
+            instance = new TezResourceManager();
+        }
+        return instance;
+    }
+  
+    public void init(PigContext pigContext, Configuration conf) throws IOException {
+        if (!inited) {
+            this.stagingDir = FileLocalizer.getTemporaryPath(pigContext, "-tez-resource");;
+            this.pigContext = pigContext;
+            this.conf = conf;
+            String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
+            this.bootStrapJar = new File(jar).getName().toString();
+            remoteFs = FileSystem.get(conf);
+            addBootStrapJar();
+            inited = true;
+        }
     }
 
     // Add files from the source FS as local resources. The resource name will
     // be the same as the file name.
     public Path addTezResource(URL url) throws IOException {
-        Path resourcePath = new Path(url.getFile());
-        String resourceName = resourcePath.getName();
-
-        if (resources.containsKey(resourceName)) {
-            return resources.get(resourceName);
+        synchronized(this) {
+            Path resourcePath = new Path(url.getFile());
+            String resourceName = resourcePath.getName();
+    
+            if (resources.containsKey(resourceName)) {
+                return resources.get(resourceName);
+            }
+    
+            // Ship the resource to the staging directory on the remote FS
+            Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
+            remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
+            resources.put(resourceName, remoteFsPath);
+            return remoteFsPath;
         }
-
-        // Ship the resource to the staging directory on the remote FS
-        Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
-        remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
-        resources.put(resourceName, remoteFsPath);
-        return remoteFsPath;
     }
 
     // Add files already present in the remote FS as local resources. Allow the
@@ -97,20 +111,22 @@ public class TezResourceManager {
     }
 
     public void addBootStrapJar() throws IOException {
-        if (resources.containsKey(bootStrapJar)) {
-            return;
+        synchronized(this) {
+            if (resources.containsKey(bootStrapJar)) {
+                return;
+            }
+    
+            FileSystem remoteFs = FileSystem.get(conf);
+            File jobJar = File.createTempFile("Job", ".jar");
+            jobJar.deleteOnExit();
+            FileOutputStream fos = new FileOutputStream(jobJar);
+            JarManager.createBootStrapJar(fos, pigContext);
+    
+            // Ship the job.jar to the staging directory on the remote FS
+            Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, bootStrapJar));
+            remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
+            resources.put(bootStrapJar, remoteJarPath);
         }
-
-        FileSystem remoteFs = FileSystem.get(conf);
-        File jobJar = File.createTempFile("Job", ".jar");
-        jobJar.deleteOnExit();
-        FileOutputStream fos = new FileOutputStream(jobJar);
-        JarManager.createBootStrapJar(fos, pigContext);
-
-        // Ship the job.jar to the staging directory on the remote FS
-        Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName()));
-        remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
-        resources.put(new Path(bootStrapJar.getFile()).getName(), remoteJarPath);
     }
 
     public Map<String, LocalResource> getTezResources(Set<String> resourceNames)
throws Exception {

Modified: pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java Mon Jun  2 21:12:43 2014
@@ -44,7 +44,7 @@ public class TestSecondarySortTez extend
     public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query)
             throws Exception, VisitorException {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
-        TezCompiler comp = new TezCompiler(pp, pc, null);
+        TezCompiler comp = new TezCompiler(pp, pc);
         TezOperPlan tezPlan = comp.compile();
         boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
                 PigConfiguration.PROP_NO_COMBINER, "false"));

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Mon Jun  2 21:12:43 2014
@@ -551,7 +551,7 @@ public class TestTezCompiler {
     private void run(String query, String expectedFile) throws Exception {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
         TezLauncher launcher = new TezLauncher();
-        TezPlanContainer tezPlanContainer = launcher.compile(pp, pc, null);
+        TezPlanContainer tezPlanContainer = launcher.compile(pp, pc);
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);

Modified: pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1599356&r1=1599355&r2=1599356&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java Mon Jun  2 21:12:43 2014
@@ -169,7 +169,7 @@ public class TestTezJobControlCompiler {
 
     private Pair<TezOperPlan, DAG> compile(String query) throws Exception {
         PhysicalPlan pp = Util.buildPp(pigServer, query);
-        TezCompiler comp = new TezCompiler(pp, pc, null);
+        TezCompiler comp = new TezCompiler(pp, pc);
         TezOperPlan tezPlan = comp.compile();
         TezJobControlCompiler jobComp = new TezJobControlCompiler(pc, new Configuration());
         DAG dag = jobComp.buildDAG(tezPlan, new HashMap<String, LocalResource>());



Mime
View raw message