Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E040104D4 for ; Mon, 2 Jun 2014 21:13:11 +0000 (UTC) Received: (qmail 70963 invoked by uid 500); 2 Jun 2014 21:13:11 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 70929 invoked by uid 500); 2 Jun 2014 21:13:11 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 70921 invoked by uid 99); 2 Jun 2014 21:13:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jun 2014 21:13:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jun 2014 21:13:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CE307238890B; Mon, 2 Jun 2014 21:12:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1599356 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ test/org/apache/pig/tez/ Date: Mon, 02 Jun 2014 21:12:44 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140602211244.CE307238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: daijy Date: Mon Jun 2 21:12:43 2014 New Revision: 1599356 URL: http://svn.apache.org/r1599356 Log: PIG-3978: Container reuse does not across PigServer Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon Jun 2 21:12:43 2014 @@ -30,6 +30,8 @@ OPTIMIZATIONS BUG FIXES +PIG-3978: Container reuse does not across PigServer (daijy) + PIG-3974: E2E test data generation fails in cluster mode (lbendig via cheolsoo) PIG-3969: Javascript UDF fails if no output schema is defined (lbendig via cheolsoo) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Jun 2 21:12:43 2014 @@ -149,8 +149,6 @@ public class TezCompiler extends PhyPlan private Map phyToTezOpMap; - private TezResourceManager tezResourceManager; - public static final String USER_COMPARATOR_MARKER = "user.comparator.func:"; public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold"; public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation"; @@ -160,16 +158,15 @@ public class TezCompiler extends PhyPlan private POLocalRearrangeTezFactory localRearrangeFactory; - public TezCompiler(PhysicalPlan plan, PigContext pigContext, TezResourceManager tezResourceManager) + public TezCompiler(PhysicalPlan plan, PigContext pigContext) throws TezCompilerException { super(plan, new DepthFirstWalker(plan)); this.plan = plan; this.pigContext = pigContext; - this.tezResourceManager = tezResourceManager; pigProperties = pigContext.getProperties(); splitsSeen = Maps.newHashMap(); - tezPlan = new TezOperPlan(tezResourceManager); + tezPlan = new TezOperPlan(); nig = NodeIdGenerator.getGenerator(); udfFinder = new UDFFinder(); List roots = plan.getRoots(); @@ -196,7 +193,7 @@ public class TezCompiler extends PhyPlan // Segment a single DAG into a DAG graph public TezPlanContainer getPlanContainer() throws PlanException { - TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext, tezResourceManager); + TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext); TezPlanContainerNode node = new TezPlanContainerNode(OperatorKey.genOpKey(scope), tezPlan); tezPlanContainer.add(node); tezPlanContainer.split(node); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Jun 2 21:12:43 2014 @@ -65,8 +65,10 @@ public class TezLauncher extends Launche Path stagingDir = FileLocalizer.getTemporaryPath(pc, "-tez"); - TezResourceManager tezResourceManager = new TezResourceManager(stagingDir, pc, conf); + TezResourceManager tezResourceManager = TezResourceManager.getInstance(); + tezResourceManager.init(pc, conf); + stagingDir.getFileSystem(conf).mkdirs(stagingDir); log.info("Tez staging directory is " + stagingDir.toString()); conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString()); @@ -77,7 +79,7 @@ public class TezLauncher extends Launche PigStats.start(tezStats); TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf); - TezPlanContainer tezPlanContainer = compile(php, pc, tezResourceManager); + TezPlanContainer tezPlanContainer = compile(php, pc); TezOperPlan tezPlan; @@ -205,7 +207,7 @@ public class TezLauncher extends Launche String format, boolean verbose) throws PlanException, VisitorException, IOException { log.debug("Entering TezLauncher.explain"); - TezPlanContainer tezPlanContainer = compile(php, pc, null); + TezPlanContainer tezPlanContainer = compile(php, pc); if (format.equals("text")) { TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer); @@ -217,9 +219,9 @@ public class TezLauncher extends Launche } } - public TezPlanContainer compile(PhysicalPlan php, PigContext pc, TezResourceManager tezResourceManager) + public TezPlanContainer compile(PhysicalPlan php, PigContext pc) throws PlanException, IOException, VisitorException { - TezCompiler comp = new TezCompiler(php, pc, tezResourceManager); + TezCompiler comp = new TezCompiler(php, pc); TezOperPlan tezPlan = comp.compile(); NoopFilterRemover filter = new NoopFilterRemover(tezPlan); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Mon Jun 2 21:12:43 2014 @@ -42,12 +42,9 @@ public class TezOperPlan extends Operato private static final long serialVersionUID = 1L; - private TezResourceManager tezResourceManager; - private Map extraResources = new HashMap(); - public TezOperPlan(TezResourceManager tezResourceManager) { - this.tezResourceManager = tezResourceManager; + public TezOperPlan() { } @Override @@ -70,7 +67,7 @@ public class TezOperPlan extends Operato String resourceName = resourcePath.getName(); if (!extraResources.containsKey(resourceName)) { - Path remoteFsPath = tezResourceManager.addTezResource(url); + Path remoteFsPath = TezResourceManager.getInstance().addTezResource(url); extraResources.put(resourceName, remoteFsPath); } } @@ -78,7 +75,7 @@ public class TezOperPlan extends Operato // Add extra plan-specific local resources already present in the remote FS public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException { if (!extraResources.containsKey(resourceName)) { - tezResourceManager.addTezResource(resourceName, remoteFsPath); + TezResourceManager.getInstance().addTezResource(resourceName, remoteFsPath); extraResources.put(resourceName, remoteFsPath); } } @@ -93,7 +90,7 @@ public class TezOperPlan extends Operato addShipResources(streamVisitor.getShipFiles()); addCacheResources(streamVisitor.getCacheFiles()); - return tezResourceManager.getTezResources(extraResources.keySet()); + return TezResourceManager.getInstance().getTezResources(extraResources.keySet()); } // In the statement "SHIP('/home/foo')" we'll map the resource name foo to Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Mon Jun 2 21:12:43 2014 @@ -38,12 +38,10 @@ import org.apache.pig.impl.util.JarManag public class TezPlanContainer extends OperatorPlan { private static final long serialVersionUID = 1L; - private TezResourceManager tezResourceManager; private PigContext pigContext; - public TezPlanContainer(PigContext pigContext, TezResourceManager tezResourceManager) { + public TezPlanContainer(PigContext pigContext) { this.pigContext = pigContext; - this.tezResourceManager = tezResourceManager; } // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer @@ -52,7 +50,7 @@ public class TezPlanContainer extends Op public Map getLocalResources() throws Exception { Set jarLists = new HashSet(); - jarLists.add(tezResourceManager.getBootStrapJar()); + jarLists.add(new File(TezResourceManager.getInstance().getBootStrapJar()).toURI().toURL()); // In MR Pig the extra jars and script jars get put in Distributed Cache, but // in Tez we'll add them as local resources. @@ -101,7 +99,7 @@ public class TezPlanContainer extends Op // } // } - return tezResourceManager.addTezResources(jarLists); + return TezResourceManager.getInstance().addTezResources(jarLists); } public TezOperPlan getNextPlan(List processedPlans) { @@ -157,7 +155,7 @@ public class TezPlanContainer extends Op if (operToSegment != null) { for (TezOperator succ : succs) { tezOperPlan.disconnect(operToSegment, succ); - TezOperPlan newOperPlan = new TezOperPlan(tezResourceManager); + TezOperPlan newOperPlan = new TezOperPlan(); List containerSuccs = new ArrayList(); if (getSuccessors(planNode)!=null) { containerSuccs.addAll(getSuccessors(planNode)); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Mon Jun 2 21:12:43 2014 @@ -35,46 +35,60 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.util.JarManager; public class TezResourceManager { + private static TezResourceManager instance = null; + private boolean inited = false; private Path stagingDir; private PigContext pigContext; private Configuration conf; - private URL bootStrapJar; + private String bootStrapJar; private FileSystem remoteFs; public Map resources = new HashMap(); - public URL getBootStrapJar() { + public String getBootStrapJar() { return bootStrapJar; } - public TezResourceManager(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException { - resources.clear(); - this.stagingDir = stagingDir; - this.pigContext = pigContext; - this.conf = conf; - String jar = JarManager.findContainingJar(org.apache.pig.Main.class); - this.bootStrapJar = new File(jar).toURI().toURL(); - remoteFs = FileSystem.get(conf); - addBootStrapJar(); + static public TezResourceManager getInstance() { + if (instance==null) { + instance = new TezResourceManager(); + } + return instance; + } + + public void init(PigContext pigContext, Configuration conf) throws IOException { + if (!inited) { + this.stagingDir = FileLocalizer.getTemporaryPath(pigContext, "-tez-resource");; + this.pigContext = pigContext; + this.conf = conf; + String jar = JarManager.findContainingJar(org.apache.pig.Main.class); + this.bootStrapJar = new File(jar).getName().toString(); + remoteFs = FileSystem.get(conf); + addBootStrapJar(); + inited = true; + } } // Add files from the source FS as local resources. The resource name will // be the same as the file name. public Path addTezResource(URL url) throws IOException { - Path resourcePath = new Path(url.getFile()); - String resourceName = resourcePath.getName(); - - if (resources.containsKey(resourceName)) { - return resources.get(resourceName); + synchronized(this) { + Path resourcePath = new Path(url.getFile()); + String resourceName = resourcePath.getName(); + + if (resources.containsKey(resourceName)) { + return resources.get(resourceName); + } + + // Ship the resource to the staging directory on the remote FS + Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName)); + remoteFs.copyFromLocalFile(resourcePath, remoteFsPath); + resources.put(resourceName, remoteFsPath); + return remoteFsPath; } - - // Ship the resource to the staging directory on the remote FS - Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName)); - remoteFs.copyFromLocalFile(resourcePath, remoteFsPath); - resources.put(resourceName, remoteFsPath); - return remoteFsPath; } // Add files already present in the remote FS as local resources. Allow the @@ -97,20 +111,22 @@ public class TezResourceManager { } public void addBootStrapJar() throws IOException { - if (resources.containsKey(bootStrapJar)) { - return; + synchronized(this) { + if (resources.containsKey(bootStrapJar)) { + return; + } + + FileSystem remoteFs = FileSystem.get(conf); + File jobJar = File.createTempFile("Job", ".jar"); + jobJar.deleteOnExit(); + FileOutputStream fos = new FileOutputStream(jobJar); + JarManager.createBootStrapJar(fos, pigContext); + + // Ship the job.jar to the staging directory on the remote FS + Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, bootStrapJar)); + remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath); + resources.put(bootStrapJar, remoteJarPath); } - - FileSystem remoteFs = FileSystem.get(conf); - File jobJar = File.createTempFile("Job", ".jar"); - jobJar.deleteOnExit(); - FileOutputStream fos = new FileOutputStream(jobJar); - JarManager.createBootStrapJar(fos, pigContext); - - // Ship the job.jar to the staging directory on the remote FS - Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName())); - remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath); - resources.put(new Path(bootStrapJar.getFile()).getName(), remoteJarPath); } public Map getTezResources(Set resourceNames) throws Exception { Modified: pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java (original) +++ pig/trunk/test/org/apache/pig/tez/TestSecondarySortTez.java Mon Jun 2 21:12:43 2014 @@ -44,7 +44,7 @@ public class TestSecondarySortTez extend public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query) throws Exception, VisitorException { PhysicalPlan pp = Util.buildPp(pigServer, query); - TezCompiler comp = new TezCompiler(pp, pc, null); + TezCompiler comp = new TezCompiler(pp, pc); TezOperPlan tezPlan = comp.compile(); boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty( PigConfiguration.PROP_NO_COMBINER, "false")); Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Mon Jun 2 21:12:43 2014 @@ -551,7 +551,7 @@ public class TestTezCompiler { private void run(String query, String expectedFile) throws Exception { PhysicalPlan pp = Util.buildPp(pigServer, query); TezLauncher launcher = new TezLauncher(); - TezPlanContainer tezPlanContainer = launcher.compile(pp, pc, null); + TezPlanContainer tezPlanContainer = launcher.compile(pp, pc); ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); Modified: pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1599356&r1=1599355&r2=1599356&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original) +++ pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java Mon Jun 2 21:12:43 2014 @@ -169,7 +169,7 @@ public class TestTezJobControlCompiler { private Pair compile(String query) throws Exception { PhysicalPlan pp = Util.buildPp(pigServer, query); - TezCompiler comp = new TezCompiler(pp, pc, null); + TezCompiler comp = new TezCompiler(pp, pc); TezOperPlan tezPlan = comp.compile(); TezJobControlCompiler jobComp = new TezJobControlCompiler(pc, new Configuration()); DAG dag = jobComp.buildDAG(tezPlan, new HashMap());