Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 83DBA114FF for ; Fri, 20 Jun 2014 02:40:08 +0000 (UTC) Received: (qmail 12447 invoked by uid 500); 20 Jun 2014 02:40:08 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 12403 invoked by uid 500); 20 Jun 2014 02:40:08 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 12394 invoked by uid 99); 20 Jun 2014 02:40:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jun 2014 02:40:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1BC6C986109; Fri, 20 Jun 2014 02:40:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <5fff9ad50157483293eae4456df68513@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-418: Add a logging directory parameter for Crunch DOT plan files. Contributed by Allan Shoup. Date: Fri, 20 Jun 2014 02:40:08 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/master ac4a525ad -> 189febe62 CRUNCH-418: Add a logging directory parameter for Crunch DOT plan files. Contributed by Allan Shoup. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/189febe6 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/189febe6 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/189febe6 Branch: refs/heads/master Commit: 189febe62496e3e44386c3d095949a3041a6bae0 Parents: ac4a525 Author: Josh Wills Authored: Thu Jun 19 19:39:13 2014 -0700 Committer: Josh Wills Committed: Thu Jun 19 19:39:43 2014 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/MRPipelineIT.java | 29 +++++++++ .../org/apache/crunch/impl/mr/MRPipeline.java | 67 +++++++++++++++++++- .../crunch/impl/mr/plan/PlanningParameters.java | 7 ++ 3 files changed, 100 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java index 25c85c8..6af3f84 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java @@ -17,16 +17,22 @@ */ package org.apache.crunch; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.io.Serializable; +import java.net.URLEncoder; +import com.google.common.io.Files; +import org.apache.commons.io.filefilter.SuffixFileFilter; import org.apache.crunch.PipelineResult.StageResult; import org.apache.crunch.fn.FilterFns; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.io.To; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; @@ -79,5 +85,28 @@ public class MRPipelineIT implements Serializable { assertTrue(new File(outputDirA, "part-r-00000").exists()); assertTrue(new File(outputDirB, "part-r-00000").exists()); } + + @Test + public void testWritingOfDotfile() throws IOException { + File dotfileDir = Files.createTempDir(); + Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration()); + pipeline.getConfiguration().set(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR, dotfileDir.getAbsolutePath()); + + PCollection lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")); + pipeline.write( + lines.parallelDo(IdentityFn.getInstance(), Writables.strings()), + To.textFile(tmpDir.getFile("output").getAbsolutePath())); + pipeline.done(); + File[] files = dotfileDir.listFiles((FileFilter)new SuffixFileFilter(".dot")); + assertEquals(1, files.length); + String fileName = files[0].getName(); + String fileNamePrefix = URLEncoder.encode(pipeline.getName(), "UTF-8"); + fileNamePrefix = (fileNamePrefix.length() < 150) ? fileNamePrefix : fileNamePrefix.substring(0, 150); + assertTrue("DOT file name '" + fileName + "' did not start with the pipeline name '" + fileNamePrefix + "'.", + fileName.startsWith(fileNamePrefix)); + + String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot"; + assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex)); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 01a3ead..6cfc6d0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -18,8 +18,16 @@ package org.apache.crunch.impl.mr; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; +import com.google.common.base.Charsets; +import com.google.common.collect.Maps; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CachingOptions; @@ -32,11 +40,13 @@ import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.MRCollectionFactory; import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.impl.mr.plan.MSCRPlanner; +import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Maps; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** * Pipeline implementation that is executed within Hadoop MapReduce. @@ -120,7 +130,9 @@ public class MRPipeline extends DistributedPipeline { @Override public MRPipelineExecution runAsync() { - MRPipelineExecution res = plan().execute(); + MRExecutor mrExecutor = plan(); + writePlanDotFile(mrExecutor.getPlanDotFile()); + MRPipelineExecution res = mrExecutor.execute(); outputTargets.clear(); return res; } @@ -141,4 +153,53 @@ public class MRPipeline extends DistributedPipeline { // Identical to materialization in a MapReduce context materialize(pcollection); } + + /** + * Writes the MR job plan dot file contents to a timestamped file if the PIPELINE_DOTFILE_OUTPUT_DIR + * config key is set with an output directory. + * + * @param dotFileContents contents to be written to the dot file + */ + private void writePlanDotFile(String dotFileContents) { + String dotFileDir = getConfiguration().get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR); + if (dotFileDir != null) { + FSDataOutputStream outputStream = null; + Exception thrownException = null; + try { + URI uri = new URI(dotFileDir); + FileSystem fs = FileSystem.get(uri, getConfiguration()); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS"); + String filenameSuffix = String.format("_%s_jobplan.dot", dateFormat.format(new Date())); + String encodedName = URLEncoder.encode(getName(), "UTF-8"); + // We limit the pipeline name to the first 150 characters to keep the output dotfile length less + // than 200, as it's not clear what the exact limits are on the filesystem we're writing to (this + // might be HDFS or it might be a local filesystem) + final int maxPipeNameLength = 150; + String filenamePrefix = encodedName.substring(0, Math.min(maxPipeNameLength, encodedName.length())); + Path jobPlanPath = new Path(uri.getPath(), filenamePrefix + filenameSuffix); + LOG.info("Writing jobplan to " + jobPlanPath); + outputStream = fs.create(jobPlanPath, true); + outputStream.write(dotFileContents.getBytes(Charsets.UTF_8)); + } catch (URISyntaxException e) { + thrownException = e; + throw new CrunchRuntimeException("Invalid dot file dir URI, job plan will not be written: " + dotFileDir, e); + } catch (IOException e) { + thrownException = e; + throw new CrunchRuntimeException("Error writing dotfile contents to " + dotFileDir, e); + } catch (RuntimeException e) { + thrownException = e; + throw e; + } finally { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + if (thrownException == null) + throw new CrunchRuntimeException("Error closing dotfile", e); + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java index cdfb46f..de89c48 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java @@ -33,6 +33,13 @@ public final class PlanningParameters { */ public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile"; + /** + * Configuration key under which a directory URI can be stored where MapReduce pipeline job plans in + * DOT format are stored. The dot files are only written if this configuration + * parameter is set. + */ + public static final String PIPELINE_DOTFILE_OUTPUT_DIR = "crunch.planner.dotfile.outputdir"; + public static final String JOB_NAME_MAX_STACK_LENGTH = "crunch.job.name.max.stack.length"; private PlanningParameters() {