Return-Path: X-Original-To: apmail-mahout-commits-archive@www.apache.org Delivered-To: apmail-mahout-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A2052CD7B for ; Wed, 26 Jun 2013 13:35:21 +0000 (UTC) Received: (qmail 46313 invoked by uid 500); 26 Jun 2013 13:35:20 -0000 Delivered-To: apmail-mahout-commits-archive@mahout.apache.org Received: (qmail 46112 invoked by uid 500); 26 Jun 2013 13:35:15 -0000 Mailing-List: contact commits-help@mahout.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mahout.apache.org Delivered-To: mailing list commits@mahout.apache.org Received: (qmail 46103 invoked by uid 99); 26 Jun 2013 13:35:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Jun 2013 13:35:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,WEIRD_QUOTING X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Jun 2013 13:35:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 760C623888E3; Wed, 26 Jun 2013 13:34:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1496927 - in /mahout/trunk/integration/src: main/java/org/apache/mahout/text/ test/java/org/apache/mahout/text/ Date: Wed, 26 Jun 2013 13:34:46 -0000 To: commits@mahout.apache.org From: smarthi@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130626133446.760C623888E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: smarthi Date: Wed Jun 26 13:34:45 2013 New Revision: 1496927 URL: http://svn.apache.org/r1496927 Log: MAHOUT-833: Make conversion to sequence files map-reduce - (changes based on feedback from review). Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1496927&r1=1496926&r2=1496927&view=diff ============================================================================== --- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original) +++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Wed Jun 26 13:34:45 2013 @@ -52,9 +52,13 @@ public class SequenceFilesFromDirectory private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"}; - private static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; private static final String[] CHARSET_OPTION = {"charset", "c"}; + private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; + + public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; + public static final String BASE_INPUT_PATH = "baseinputpath"; + public static void main(String[] args) throws Exception { ToolRunner.run(new SequenceFilesFromDirectory(), args); } @@ -131,16 +135,16 @@ public class SequenceFilesFromDirectory SequenceFileOutputFormat.class, "SequenceFilesFromDirectory"); Configuration jobConfig = job.getConfiguration(); - jobConfig.set("keyPrefix", keyPrefix); + jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix); FileSystem fs = FileSystem.get(jobConfig); FileStatus fsFileStatus = fs.getFileStatus(input); String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); - jobConfig.set("baseinputpath", input.toString()); + jobConfig.set(BASE_INPUT_PATH, input.toString()); long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024; // set the max split locations, otherwise we get nasty debug stuff - jobConfig.set("mapreduce.job.max.split.locations", "1000000"); + jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); FileInputFormat.setInputPaths(job, inputDirList); // need to set this to a multiple of the block size, or no split happens Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java?rev=1496927&r1=1496926&r2=1496927&view=diff ============================================================================== --- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java (original) +++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java Wed Jun 26 13:34:45 2013 @@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.Mappe import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.mahout.common.HadoopUtil; +import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION; + /** * Map class for SequenceFilesFromDirectory MR job */ @@ -39,7 +41,7 @@ public class SequenceFilesFromDirectoryM @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); - this.keyPrefix = context.getConfiguration().get("keyPrefix", ""); + this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], ""); } public void map(IntWritable key, BytesWritable value, Context context) Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1496927&r1=1496926&r2=1496927&view=diff ============================================================================== --- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java (original) +++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java Wed Jun 26 13:34:45 2013 @@ -16,19 +16,9 @@ */ package org.apache.mahout.text; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; -import org.apache.commons.cli2.builder.ArgumentBuilder; -import org.apache.commons.cli2.builder.DefaultOptionBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -47,6 +37,14 @@ import org.apache.mahout.utils.io.Chunke import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + /** * Converts a directory of gzipped mail archives into SequenceFiles of specified * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except @@ -57,6 +55,22 @@ public final class SequenceFilesFromMail private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class); + public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; + public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; + public static final String[] CHARSET_OPTION = {"charset", "c"}; + public static final String[] SUBJECT_OPTION = {"subject", "s"}; + public static final String[] TO_OPTION = {"to", "to"}; + public static final String[] FROM_OPTION = {"from", "from"}; + public static final String[] REFERENCES_OPTION = {"references", "refs"}; + public static final String[] BODY_OPTION = {"body", "b"}; + public static final String[] STRIP_QUOTED_OPTION = {"stripQuoted", "q"}; + public static final String[] QUOTED_REGEX_OPTION = {"quotedRegex", "regex"}; + public static final String[] SEPARATOR_OPTION = {"separator", "sep"}; + public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"}; + public static final String BASE_INPUT_PATH = "baseinputpath"; + + private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; + public void createSequenceFiles(MailOptions options) throws IOException { ChunkedWriter writer = new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir())); MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer); @@ -119,69 +133,31 @@ public final class SequenceFilesFromMail @Override public int run(String[] args) throws Exception { - DefaultOptionBuilder optionBuilder = new DefaultOptionBuilder(); - ArgumentBuilder argumentBuilder = new ArgumentBuilder(); - addInputOption(); addOutputOption(); addOption(DefaultOptionCreator.methodOption().create()); - addOption(optionBuilder.withLongName("chunkSize").withArgument( - argumentBuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()) - .withDescription("The chunkSize in MegaBytes. Defaults to 64") - .withShortName("chunk").create()); - - addOption(optionBuilder.withLongName("keyPrefix").withArgument( - argumentBuilder.withName("keyPrefix").withMinimum(1).withMaximum(1).create()) - .withDescription("The prefix to be prepended to the key") - .withShortName("prefix").create()); - addOption(optionBuilder.withLongName("charset") - .withRequired(true).withArgument(argumentBuilder.withName("charset") - .withMinimum(1).withMaximum(1).create()).withDescription( - "The name of the character encoding of the input files") - .withShortName("c").create()); - addOption(optionBuilder.withLongName("subject") - .withRequired(false).withDescription( - "Include the Mail subject as part of the text. Default is false") - .withShortName("s").create()); - addOption(optionBuilder.withLongName("to").withRequired(false) - .withDescription("Include the to field in the text. Default is false") - .withShortName("to").create()); - addOption(optionBuilder.withLongName("from").withRequired(false).withDescription( - "Include the from field in the text. Default is false") - .withShortName("from").create()); - addOption(optionBuilder.withLongName("references") - .withRequired(false).withDescription( - "Include the references field in the text. Default is false") - .withShortName("refs").create()); - addOption(optionBuilder.withLongName("body").withRequired(false) - .withDescription("Include the body in the output. Default is false") - .withShortName("b").create()); - addOption(optionBuilder.withLongName("stripQuoted") - .withRequired(false).withDescription( - "Strip (remove) quoted email text in the body. Default is false") - .withShortName("q").create()); - addOption( - optionBuilder.withLongName("quotedRegex") - .withRequired(false).withArgument(argumentBuilder.withName("regex") - .withMinimum(1).withMaximum(1).create()).withDescription( + addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64"); + addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", ""); + addOption(CHARSET_OPTION[0], CHARSET_OPTION[1], + "The name of the character encoding of the input files. Default to UTF-8", "UTF-8"); + addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text. Default is false"); + addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text. Default is false"); + addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text. Default is false"); + addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1], + "Include the references field in the text. Default is false"); + addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output. Default is false"); + addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1], + "Strip (remove) quoted email text in the body. Default is false"); + addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1], "Specify the regex that identifies quoted text. " - + "Default is to look for > or | at the beginning of the line.") - .withShortName("q").create()); - addOption( - optionBuilder.withLongName("separator") - .withRequired(false).withArgument(argumentBuilder.withName("separator") - .withMinimum(1).withMaximum(1).create()).withDescription( - "The separator to use between metadata items (to, from, etc.). Default is \\n") - .withShortName("sep").create()); - - addOption( - optionBuilder.withLongName("bodySeparator") - .withRequired(false).withArgument(argumentBuilder.withName("bodySeparator") - .withMinimum(1).withMaximum(1).create()).withDescription( + + "Default is to look for > or | at the beginning of the line."); + addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1], + "The separator to use between metadata items (to, from, etc.). Default is \\n", "\n"); + addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1], "The separator to use between lines in the body. Default is \\n. " - + "Useful to change if you wish to have the message be on one line") - .withShortName("bodySep").create()); + + "Useful to change if you wish to have the message be on one line", "\n"); + addOption(DefaultOptionCreator.helpOption()); Map> parsedArgs = parseArguments(args); if (parsedArgs == null) { @@ -191,16 +167,16 @@ public final class SequenceFilesFromMail String outputDir = getOutputPath().toString(); int chunkSize = 64; - if (hasOption("chunkSize")) { - chunkSize = Integer.parseInt(getOption("chunkSize")); + if (hasOption(CHUNK_SIZE_OPTION[0])) { + chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); } String prefix = ""; - if (hasOption("keyPrefix")) { - prefix = getOption("keyPrefix"); + if (hasOption(KEY_PREFIX_OPTION[0])) { + prefix = getOption(KEY_PREFIX_OPTION[0]); } - Charset charset = Charset.forName(getOption("charset")); + Charset charset = Charset.forName(getOption(CHARSET_OPTION[0])); MailOptions options = new MailOptions(); options.setInput(input); options.setOutputDir(outputDir); @@ -214,36 +190,40 @@ public final class SequenceFilesFromMail // would require more processing later to remove it pre feature selection. Map patternOrder = Maps.newHashMap(); int order = 0; - if (hasOption("from")) { + if (hasOption(FROM_OPTION[0])) { patterns.add(MailProcessor.FROM_PREFIX); patternOrder.put(MailOptions.FROM, order++); } - if (hasOption("to")) { + if (hasOption(TO_OPTION[0])) { patterns.add(MailProcessor.TO_PREFIX); patternOrder.put(MailOptions.TO, order++); } - if (hasOption("references")) { + if (hasOption(REFERENCES_OPTION[0])) { patterns.add(MailProcessor.REFS_PREFIX); patternOrder.put(MailOptions.REFS, order++); } - if (hasOption("subject")) { + if (hasOption(SUBJECT_OPTION[0])) { patterns.add(MailProcessor.SUBJECT_PREFIX); - patternOrder.put(MailOptions.SUBJECT, order++); + patternOrder.put(MailOptions.SUBJECT, order += 1); } - options.setStripQuotedText(hasOption("stripQuoted")); + options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0])); options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); options.setPatternOrder(patternOrder); - options.setIncludeBody(hasOption("body")); - options.setSeparator("\n"); - if (hasOption("separator")) { - options.setSeparator(getOption("separator")); + options.setIncludeBody(hasOption(BODY_OPTION[0])); + + if (hasOption(SEPARATOR_OPTION[0])) { + options.setSeparator(getOption(SEPARATOR_OPTION[0])); + } else { + options.setSeparator("\n"); } - if (hasOption("bodySeparator")) { - options.setBodySeparator(getOption("bodySeparator")); + + if (hasOption(BODY_SEPARATOR_OPTION[0])) { + options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0])); } - if (hasOption("quotedRegex")) { - options.setQuotedTextPattern(Pattern.compile(getOption("quotedRegex"))); + + if (hasOption(QUOTED_REGEX_OPTION[0])) { + options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0]))); } if (getOption(DefaultOptionCreator.METHOD_OPTION, @@ -274,58 +254,64 @@ public final class SequenceFilesFromMail Configuration jobConfig = job.getConfiguration(); - if (hasOption("keyPrefix")) { - jobConfig.set("prefix", getOption("keyPrefix")); + if (hasOption(KEY_PREFIX_OPTION[0])) { + jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0])); } int chunkSize = 0; - if (hasOption("chunkSize")) { - chunkSize = Integer.parseInt(getOption("chunkSize")); - jobConfig.set("chunkSize", String.valueOf(chunkSize)); + if (hasOption(CHUNK_SIZE_OPTION[0])) { + chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); + jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize)); } Charset charset; - if (hasOption("charset")) { - charset = Charset.forName(getOption("charset")); - jobConfig.set("charset", charset.displayName()); + if (hasOption(CHARSET_OPTION[0])) { + charset = Charset.forName(getOption(CHARSET_OPTION[0])); + jobConfig.set(CHARSET_OPTION[0], charset.displayName()); } - if (hasOption("from")) { - jobConfig.set("fromOpt", "true"); + if (hasOption(FROM_OPTION[0])) { + jobConfig.set(FROM_OPTION[1], "true"); } - if (hasOption("to")) { - jobConfig.set("toOpt", "true"); + if (hasOption(TO_OPTION[0])) { + jobConfig.set(TO_OPTION[1], "true"); } - if (hasOption("references")) { - jobConfig.set("refsOpt", "true"); + if (hasOption(REFERENCES_OPTION[0])) { + jobConfig.set(REFERENCES_OPTION[1], "true"); } - if (hasOption("subject")) { - jobConfig.set("subjectOpt", "true"); + if (hasOption(SUBJECT_OPTION[0])) { + jobConfig.set(SUBJECT_OPTION[1], "true"); } - if (hasOption("quotedRegex")) { - jobConfig.set("quotedRegex", Pattern.compile(getOption("quotedRegex")).toString()); + if (hasOption(QUOTED_REGEX_OPTION[0])) { + jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString()); + } + + if (hasOption(SEPARATOR_OPTION[0])) { + jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0])); + } else { + jobConfig.set(SEPARATOR_OPTION[1], "\n"); } - if (hasOption("separatorOpt")) { - jobConfig.set("separatorOpt", getOption("separatorOpt")); + if (hasOption(BODY_OPTION[0])) { + jobConfig.set(BODY_OPTION[1], "true"); } else { - jobConfig.set("separatorOpt", "\n"); + jobConfig.set(BODY_OPTION[1], "false"); } - if (hasOption("body")) { - jobConfig.set("bodyOpt", "true"); + if (hasOption(BODY_SEPARATOR_OPTION[0])) { + jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0])); } else { - jobConfig.set("bodyOpt", "false"); + jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n"); } FileSystem fs = FileSystem.get(jobConfig); FileStatus fsFileStatus = fs.getFileStatus(inputPath); - jobConfig.set("baseinputpath", inputPath.toString()); + jobConfig.set(BASE_INPUT_PATH, inputPath.toString()); String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); FileInputFormat.setInputPaths(job, inputDirList); @@ -334,7 +320,7 @@ public final class SequenceFilesFromMail FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes); // set the max split locations, otherwise we get nasty debug stuff - jobConfig.set("mapreduce.job.max.split.locations", "1000000"); + jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); boolean succeeded = job.waitForCompletion(true); if (!succeeded) { Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java?rev=1496927&r1=1496926&r2=1496927&view=diff ============================================================================== --- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java (original) +++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java Wed Jun 26 13:34:45 2013 @@ -17,120 +17,131 @@ package org.apache.mahout.text; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.mahout.common.HadoopUtil; import org.apache.mahout.common.iterator.FileLineIterable; import org.apache.mahout.utils.email.MailOptions; import org.apache.mahout.utils.email.MailProcessor; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION; /** - * * Map Class for the SequenceFilesFromMailArchives job - * */ public class SequenceFilesFromMailArchivesMapper extends Mapper { - + private Text outKey = new Text(); private Text outValue = new Text(); - + private static final Pattern MESSAGE_START = Pattern.compile( - "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE); + "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE); private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile( - "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE); + "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE); private MailOptions options; - + @Override public void setup(Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); + Configuration configuration = context.getConfiguration(); + // absorb all of the options into the MailOptions object - this.options = new MailOptions(); - options.setPrefix(conf.get("prefix", "")); - - if (!conf.get("chunkSize", "").equals("")) { - options.setChunkSize(conf.getInt("chunkSize", 64)); - } - - if (!conf.get("charset", "").equals("")) { - Charset charset = Charset.forName(conf.get("charset", "UTF-8")); + options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], "")); + + if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) { + options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64)); + } + + if (!configuration.get(CHARSET_OPTION[0], "").equals("")) { + Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8")); options.setCharset(charset); } else { Charset charset = Charset.forName("UTF-8"); options.setCharset(charset); } - + List patterns = Lists.newArrayListWithCapacity(5); // patternOrder is used downstream so that we can know what order the // text is in instead // of encoding it in the string, which // would require more processing later to remove it pre feature // selection. - Map patternOrder = Maps.newHashMap(); + Map patternOrder = Maps.newHashMap(); int order = 0; - - if (!conf.get("fromOpt", "").equals("")) { + if (!configuration.get(FROM_OPTION[1], "").equals("")) { patterns.add(MailProcessor.FROM_PREFIX); patternOrder.put(MailOptions.FROM, order++); } - if (!conf.get("toOpt", "").equals("")) { + if (!configuration.get(TO_OPTION[1], "").equals("")) { patterns.add(MailProcessor.TO_PREFIX); patternOrder.put(MailOptions.TO, order++); } - if (!conf.get("refsOpt", "").equals("")) { + if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) { patterns.add(MailProcessor.REFS_PREFIX); patternOrder.put(MailOptions.REFS, order++); } - - if (!conf.get("subjectOpt", "").equals("")) { + + if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) { patterns.add(MailProcessor.SUBJECT_PREFIX); - patternOrder.put(MailOptions.SUBJECT, order++); + patternOrder.put(MailOptions.SUBJECT, order += 1); } - - options.setStripQuotedText(conf.getBoolean("quotedOpt", false)); - + + options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false)); + options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); options.setPatternOrder(patternOrder); - - options.setIncludeBody(conf.getBoolean("bodyOpt", false)); - + + options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false)); + options.setSeparator("\n"); - if (!conf.get("separatorOpt", "").equals("")) { - options.setSeparator(conf.get("separatorOpt", "")); + if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) { + options.setSeparator(configuration.get(SEPARATOR_OPTION[1], "")); } - if (!conf.get("bodySeparatorOpt", "").equals("")) { - options.setBodySeparator(conf.get("bodySeparatorOpt", "")); + if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) { + options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], "")); } - if (!conf.get("quotedRegexOpt", "").equals("")) { - options.setQuotedTextPattern(Pattern.compile(conf.get("quotedRegexOpt", ""))); + if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) { + options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], ""))); } } - - public long parseMboxLineByLine(String filename, InputStream mboxInputStream, Context context) + + public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context) throws IOException, InterruptedException { long messageCount = 0; try { @@ -139,19 +150,19 @@ public class SequenceFilesFromMailArchiv Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher(""); Matcher messageBoundaryMatcher = MESSAGE_START.matcher(""); String[] patternResults = new String[options.getPatternsToMatch().length]; - Matcher[] matchers = new Matcher[options.getPatternsToMatch().length]; - for (int i = 0; i < matchers.length; i++) { - matchers[i] = options.getPatternsToMatch()[i].matcher(""); + Matcher[] matches = new Matcher[options.getPatternsToMatch().length]; + for (int i = 0; i < matches.length; i++) { + matches[i] = options.getPatternsToMatch()[i].matcher(""); } - + String messageId = null; boolean inBody = false; Pattern quotedTextPattern = options.getQuotedTextPattern(); - - for (String nextLine : new FileLineIterable(mboxInputStream, options.getCharset(), false, filename)) { + + for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) { if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) { - for (int i = 0; i < matchers.length; i++) { - Matcher matcher = matchers[i]; + for (int i = 0; i < matches.length; i++) { + Matcher matcher = matches[i]; matcher.reset(nextLine); if (matcher.matches()) { patternResults[i] = matcher.group(1); @@ -202,7 +213,6 @@ public class SequenceFilesFromMailArchiv if (messageId != null) { String key = generateKey(filename, options.getPrefix(), messageId); writeContent(options.getSeparator(), contents, body, patternResults); - this.outKey.set(key); this.outValue.set(contents.toString()); context.write(this.outKey, this.outValue); @@ -211,23 +221,16 @@ public class SequenceFilesFromMailArchiv } catch (FileNotFoundException ignored) { } - // TODO: report exceptions and continue; return messageCount; } - + protected static String generateKey(String mboxFilename, String prefix, String messageId) { - return prefix + File.separator + mboxFilename + File.separator + messageId; + return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator()); } - + private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) { - for (String match : matches) { - if (match != null) { - contents.append(match).append(separator); - } else { - contents.append("").append(separator); - } - } - contents.append(body); + String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator()); + contents.append(matchesString).append(separator).append(body); } public void map(IntWritable key, BytesWritable value, Context context) @@ -236,6 +239,6 @@ public class SequenceFilesFromMailArchiv Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get()); String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath); ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes()); - parseMboxLineByLine(relativeFilePath, is, context); + parseMailboxLineByLine(relativeFilePath, is, context); } } Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java?rev=1496927&r1=1496926&r2=1496927&view=diff ============================================================================== --- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java (original) +++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java Wed Jun 26 13:34:45 2013 @@ -38,8 +38,6 @@ import org.junit.Test; */ public final class SequenceFilesFromMailArchivesTest extends MahoutTestCase { - // TODO: Negative tests - private File inputDir; /** Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1496927&r1=1496926&r2=1496927&view=diff ============================================================================== --- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original) +++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Wed Jun 26 13:34:45 2013 @@ -58,9 +58,9 @@ public final class TestSequenceFilesFrom @Test public void testSequenceFileFromDirectoryBasic() throws Exception { // parameters - Configuration conf = new Configuration(); + Configuration configuration = new Configuration(); - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(configuration); // create Path tmpDir = this.getTestTempDirPath(); @@ -74,7 +74,7 @@ public final class TestSequenceFilesFrom fs.mkdirs(inputDirRecursive); // prepare input files - createFilesFromArrays(conf, inputDir, DATA1); + createFilesFromArrays(configuration, inputDir, DATA1); SequenceFilesFromDirectory.main(new String[]{ "--input", inputDir.toString(), @@ -85,9 +85,9 @@ public final class TestSequenceFilesFrom "--method", "sequential"}); // check output chunk files - checkChunkFiles(conf, outputDir, DATA1, "UID"); + checkChunkFiles(configuration, outputDir, DATA1, "UID"); - createRecursiveDirFilesFromArrays(conf, inputDirRecursive, DATA2); + createRecursiveDirFilesFromArrays(configuration, inputDirRecursive, DATA2); FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive); String dirs = HadoopUtil.buildDirList(fs, fstInputPath); @@ -101,7 +101,7 @@ public final class TestSequenceFilesFrom "--keyPrefix", "UID", "--method", "sequential"}); - checkRecursiveChunkFiles(conf, outputDirRecursive, DATA2, "UID"); + checkRecursiveChunkFiles(configuration, outputDirRecursive, DATA2, "UID"); } @Test @@ -166,8 +166,9 @@ public final class TestSequenceFilesFrom } } - private static void createRecursiveDirFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException { - FileSystem fs = FileSystem.get(conf); + private static void createRecursiveDirFilesFromArrays(Configuration configuration, Path inputDir, + String[][] data) throws IOException { + FileSystem fs = FileSystem.get(configuration); logger.info("creativeRecursiveDirFilesFromArrays > based on: {}", inputDir.toString()); Path curPath; @@ -193,11 +194,11 @@ public final class TestSequenceFilesFrom } } - private static void checkChunkFiles(Configuration conf, + private static void checkChunkFiles(Configuration configuration, Path outputDir, String[][] data, String prefix) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(configuration); // output exists? FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles()); @@ -210,7 +211,8 @@ public final class TestSequenceFilesFrom } // read a chunk to check content - SequenceFileIterator iterator = new SequenceFileIterator(fileStatuses[0].getPath(), true, conf); + SequenceFileIterator iterator = + new SequenceFileIterator(fileStatuses[0].getPath(), true, configuration); try { while (iterator.hasNext()) { Pair record = iterator.next(); @@ -233,11 +235,11 @@ public final class TestSequenceFilesFrom } } - private static void checkRecursiveChunkFiles(Configuration conf, + private static void checkRecursiveChunkFiles(Configuration configuration, Path outputDir, String[][] data, String prefix) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(configuration); System.out.println(" ----------- check_Recursive_ChunkFiles ------------"); @@ -255,7 +257,7 @@ public final class TestSequenceFilesFrom } // read a chunk to check content - SequenceFileIterator iterator = new SequenceFileIterator(fileStatuses[0].getPath(), true, conf); + SequenceFileIterator iterator = new SequenceFileIterator(fileStatuses[0].getPath(), true, configuration); try { while (iterator.hasNext()) { Pair record = iterator.next(); @@ -302,9 +304,9 @@ public final class TestSequenceFilesFrom } } - private static void checkMRResultFilesRecursive(Configuration conf, Path outputDir, + private static void checkMRResultFilesRecursive(Configuration configuration, Path outputDir, String[][] data, String prefix) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(configuration); // output exists? FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles()); @@ -320,7 +322,7 @@ public final class TestSequenceFilesFrom // read a chunk to check content SequenceFileIterator iterator = new SequenceFileIterator( - fileStatuses[0].getPath(), true, conf); + fileStatuses[0].getPath(), true, configuration); try { while (iterator.hasNext()) { Pair record = iterator.next();