Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F27B8106CE for ; Sun, 15 Sep 2013 20:18:35 +0000 (UTC) Received: (qmail 75483 invoked by uid 500); 15 Sep 2013 20:18:33 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 75430 invoked by uid 500); 15 Sep 2013 20:18:30 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 75404 invoked by uid 99); 15 Sep 2013 20:18:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 15 Sep 2013 20:18:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 408B1905079; Sun, 15 Sep 2013 20:18:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <405ff19b8d9c4983acb1b662375ff9c1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-263: Provide sensible defaults for the max split size of CrunchCombineFileInputFormat Date: Sun, 15 Sep 2013 20:18:27 +0000 (UTC) Updated Branches: refs/heads/master fee4d1654 -> ac17f4f72 CRUNCH-263: Provide sensible defaults for the max split size of CrunchCombineFileInputFormat Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac17f4f7 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac17f4f7 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac17f4f7 Branch: refs/heads/master Commit: ac17f4f72d2b26433ef79fec6463a7b122440ccc Parents: fee4d16 Author: Josh Wills Authored: Fri Sep 6 17:59:48 2013 -0700 Committer: Josh Wills Committed: Sun Sep 15 13:17:07 2013 -0700 ---------------------------------------------------------------------- .../impl/mr/run/CrunchCombineFileInputFormat.java | 15 +++++++++++---- .../apache/crunch/impl/mr/run/CrunchInputFormat.java | 2 +- .../apache/crunch/impl/mr/run/RuntimeParameters.java | 2 ++ 3 files changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ac17f4f7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java index 151d8b0..2413ccf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombineFileInputFormat.java @@ -17,20 +17,27 @@ */ package org.apache.crunch.impl.mr.run; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class CrunchCombineFileInputFormat extends CombineFileInputFormat { - private FileInputFormat inputFormat; - public CrunchCombineFileInputFormat(FileInputFormat inputFormat) { - this.inputFormat = inputFormat; + public CrunchCombineFileInputFormat(JobContext jobContext) { + if (getMaxSplitSize(jobContext) == Long.MAX_VALUE) { + Configuration conf = jobContext.getConfiguration(); + if (conf.get(RuntimeParameters.COMBINE_FILE_BLOCK_SIZE) != null) { + setMaxSplitSize(conf.getLong(RuntimeParameters.COMBINE_FILE_BLOCK_SIZE, 0)); + } else { + setMaxSplitSize(jobContext.getConfiguration().getLong("dfs.block.size", 134217728L)); + } + } } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/ac17f4f7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java index fa4602a..0c6f5e1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java @@ -53,7 +53,7 @@ public class CrunchInputFormat extends InputFormat { InputFormat format = (InputFormat) ReflectionUtils.newInstance(inputBundle.getFormatClass(), jobCopy.getConfiguration()); if (format instanceof FileInputFormat && !conf.getBoolean(RuntimeParameters.DISABLE_COMBINE_FILE, false)) { - format = new CrunchCombineFileInputFormat((FileInputFormat) format); + format = new CrunchCombineFileInputFormat(job); } for (Map.Entry> nodeEntry : entry.getValue().entrySet()) { Integer nodeIndex = nodeEntry.getKey(); http://git-wip-us.apache.org/repos/asf/crunch/blob/ac17f4f7/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 8912897..7dc8521 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -30,6 +30,8 @@ public class RuntimeParameters { public static final String DISABLE_COMBINE_FILE = "crunch.disable.combine.file"; + public static final String COMBINE_FILE_BLOCK_SIZE = "crunch.combine.file.block.size"; + public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; // Not instantiated