Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C4F51D9CC for ; Tue, 16 Oct 2012 00:05:28 +0000 (UTC) Received: (qmail 26509 invoked by uid 500); 16 Oct 2012 00:05:28 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 26451 invoked by uid 500); 16 Oct 2012 00:05:28 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 26440 invoked by uid 99); 16 Oct 2012 00:05:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Oct 2012 00:05:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Oct 2012 00:05:24 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CADF82388C7B; Tue, 16 Oct 2012 00:03:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1398581 [7/9] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-... Date: Tue, 16 Oct 2012 00:03:53 -0000 To: mapreduce-commits@hadoop.apache.org From: sseth@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121016000359.CADF82388C7B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/lucene/ShardWriter.java Tue Oct 16 00:02:55 2012 @@ -1,233 +1,233 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.lucene; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration; -import org.apache.hadoop.contrib.index.mapred.IntermediateForm; -import org.apache.hadoop.contrib.index.mapred.Shard; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.Term; -import org.apache.lucene.store.Directory; - -/** - * The initial version of an index is stored in the perm dir. Index files - * created by newer versions are written to a temp dir on the local FS. After - * successfully creating the new version in the temp dir, the shard writer - * moves the new files to the perm dir and deletes the temp dir in close(). - */ -public class ShardWriter { - static final Log LOG = LogFactory.getLog(ShardWriter.class); - - private final FileSystem fs; - private final FileSystem localFs; - private final Path perm; - private final Path temp; - private final Directory dir; - private final IndexWriter writer; - private int maxNumSegments; - private long numForms = 0; - - /** - * Constructor - * @param fs - * @param shard - * @param tempDir - * @param iconf - * @throws IOException - */ - public ShardWriter(FileSystem fs, Shard shard, String tempDir, - IndexUpdateConfiguration iconf) throws IOException { - LOG.info("Construct a shard writer"); - - this.fs = fs; - localFs = FileSystem.getLocal(iconf.getConfiguration()); - perm = new Path(shard.getDirectory()); - temp = new Path(tempDir); - - long initGeneration = shard.getGeneration(); - if (!fs.exists(perm)) { - assert (initGeneration < 0); - fs.mkdirs(perm); - } else { - restoreGeneration(fs, perm, initGeneration); - } - dir = - new MixedDirectory(fs, perm, localFs, fs.startLocalOutput(perm, temp), - iconf.getConfiguration()); - - // analyzer is null because we only use addIndexes, not addDocument - writer = - new IndexWriter(dir, false, null, - initGeneration < 0 ? new KeepOnlyLastCommitDeletionPolicy() - : new MixedDeletionPolicy()); - setParameters(iconf); - } - - /** - * Process an intermediate form by carrying out, on the Lucene instance of - * the shard, the deletes and the inserts (a ram index) in the form. - * @param form the intermediate form containing deletes and a ram index - * @throws IOException - */ - public void process(IntermediateForm form) throws IOException { - // first delete - Iterator iter = form.deleteTermIterator(); - while (iter.hasNext()) { - writer.deleteDocuments(iter.next()); - } - // then insert - writer.addIndexesNoOptimize(new Directory[] { form.getDirectory() }); - numForms++; - } - - /** - * Close the shard writer. Optimize the Lucene instance of the shard before - * closing if necessary, and copy the files created in the temp directory - * to the permanent directory after closing. - * @throws IOException - */ - public void close() throws IOException { - LOG.info("Closing the shard writer, processed " + numForms + " forms"); - try { - try { - if (maxNumSegments > 0) { - writer.optimize(maxNumSegments); - LOG.info("Optimized the shard into at most " + maxNumSegments - + " segments"); - } - } finally { - writer.close(); - LOG.info("Closed Lucene index writer"); - } - - moveFromTempToPerm(); - LOG.info("Moved new index files to " + perm); - - } finally { - dir.close(); - LOG.info("Closed the shard writer"); - } - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - public String toString() { - return this.getClass().getName() + "@" + perm + "&" + temp; - } - - private void setParameters(IndexUpdateConfiguration iconf) { - int maxFieldLength = iconf.getIndexMaxFieldLength(); - if (maxFieldLength > 0) { - writer.setMaxFieldLength(maxFieldLength); - } - writer.setUseCompoundFile(iconf.getIndexUseCompoundFile()); - maxNumSegments = iconf.getIndexMaxNumSegments(); - - if (maxFieldLength > 0) { - LOG.info("sea.max.field.length = " + writer.getMaxFieldLength()); - } - LOG.info("sea.use.compound.file = " + writer.getUseCompoundFile()); - LOG.info("sea.max.num.segments = " + maxNumSegments); - } - - // in case a previous reduce task fails, restore the generation to - // the original starting point by deleting the segments.gen file - // and the segments_N files whose generations are greater than the - // starting generation; rest of the unwanted files will be deleted - // once the unwanted segments_N files are deleted - private void restoreGeneration(FileSystem fs, Path perm, long startGen) - throws IOException { - - FileStatus[] fileStatus = fs.listStatus(perm, new PathFilter() { - public boolean accept(Path path) { - return LuceneUtil.isSegmentsFile(path.getName()); - } - }); - - // remove the segments_N files whose generation are greater than - // the starting generation - for (int i = 0; i < fileStatus.length; i++) { - Path path = fileStatus[i].getPath(); - if (startGen < LuceneUtil.generationFromSegmentsFileName(path.getName())) { - fs.delete(path, true); - } - } - - // always remove segments.gen in case last failed try removed segments_N - // but not segments.gen, and segments.gen will be overwritten anyway. - Path segmentsGenFile = new Path(LuceneUtil.IndexFileNames.SEGMENTS_GEN); - if (fs.exists(segmentsGenFile)) { - fs.delete(segmentsGenFile, true); - } - } - - // move the files created in the temp dir into the perm dir - // and then delete the temp dir from the local FS - private void moveFromTempToPerm() throws IOException { - try { - FileStatus[] fileStatus = - localFs.listStatus(temp, LuceneIndexFileNameFilter.getFilter()); - Path segmentsPath = null; - Path segmentsGenPath = null; - - // move the files created in temp dir except segments_N and segments.gen - for (int i = 0; i < fileStatus.length; i++) { - Path path = fileStatus[i].getPath(); - String name = path.getName(); - - if (LuceneUtil.isSegmentsGenFile(name)) { - assert (segmentsGenPath == null); - segmentsGenPath = path; - } else if (LuceneUtil.isSegmentsFile(name)) { - assert (segmentsPath == null); - segmentsPath = path; - } else { - fs.completeLocalOutput(new Path(perm, name), path); - } - } - - // move the segments_N file - if (segmentsPath != null) { - fs.completeLocalOutput(new Path(perm, segmentsPath.getName()), - segmentsPath); - } - - // move the segments.gen file - if (segmentsGenPath != null) { - fs.completeLocalOutput(new Path(perm, segmentsGenPath.getName()), - segmentsGenPath); - } - } finally { - // finally delete the temp dir (files should have been deleted) - localFs.delete(temp, true); - } - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.lucene; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration; +import org.apache.hadoop.contrib.index.mapred.IntermediateForm; +import org.apache.hadoop.contrib.index.mapred.Shard; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.Directory; + +/** + * The initial version of an index is stored in the perm dir. Index files + * created by newer versions are written to a temp dir on the local FS. After + * successfully creating the new version in the temp dir, the shard writer + * moves the new files to the perm dir and deletes the temp dir in close(). + */ +public class ShardWriter { + static final Log LOG = LogFactory.getLog(ShardWriter.class); + + private final FileSystem fs; + private final FileSystem localFs; + private final Path perm; + private final Path temp; + private final Directory dir; + private final IndexWriter writer; + private int maxNumSegments; + private long numForms = 0; + + /** + * Constructor + * @param fs + * @param shard + * @param tempDir + * @param iconf + * @throws IOException + */ + public ShardWriter(FileSystem fs, Shard shard, String tempDir, + IndexUpdateConfiguration iconf) throws IOException { + LOG.info("Construct a shard writer"); + + this.fs = fs; + localFs = FileSystem.getLocal(iconf.getConfiguration()); + perm = new Path(shard.getDirectory()); + temp = new Path(tempDir); + + long initGeneration = shard.getGeneration(); + if (!fs.exists(perm)) { + assert (initGeneration < 0); + fs.mkdirs(perm); + } else { + restoreGeneration(fs, perm, initGeneration); + } + dir = + new MixedDirectory(fs, perm, localFs, fs.startLocalOutput(perm, temp), + iconf.getConfiguration()); + + // analyzer is null because we only use addIndexes, not addDocument + writer = + new IndexWriter(dir, false, null, + initGeneration < 0 ? new KeepOnlyLastCommitDeletionPolicy() + : new MixedDeletionPolicy()); + setParameters(iconf); + } + + /** + * Process an intermediate form by carrying out, on the Lucene instance of + * the shard, the deletes and the inserts (a ram index) in the form. + * @param form the intermediate form containing deletes and a ram index + * @throws IOException + */ + public void process(IntermediateForm form) throws IOException { + // first delete + Iterator iter = form.deleteTermIterator(); + while (iter.hasNext()) { + writer.deleteDocuments(iter.next()); + } + // then insert + writer.addIndexesNoOptimize(new Directory[] { form.getDirectory() }); + numForms++; + } + + /** + * Close the shard writer. Optimize the Lucene instance of the shard before + * closing if necessary, and copy the files created in the temp directory + * to the permanent directory after closing. + * @throws IOException + */ + public void close() throws IOException { + LOG.info("Closing the shard writer, processed " + numForms + " forms"); + try { + try { + if (maxNumSegments > 0) { + writer.optimize(maxNumSegments); + LOG.info("Optimized the shard into at most " + maxNumSegments + + " segments"); + } + } finally { + writer.close(); + LOG.info("Closed Lucene index writer"); + } + + moveFromTempToPerm(); + LOG.info("Moved new index files to " + perm); + + } finally { + dir.close(); + LOG.info("Closed the shard writer"); + } + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + public String toString() { + return this.getClass().getName() + "@" + perm + "&" + temp; + } + + private void setParameters(IndexUpdateConfiguration iconf) { + int maxFieldLength = iconf.getIndexMaxFieldLength(); + if (maxFieldLength > 0) { + writer.setMaxFieldLength(maxFieldLength); + } + writer.setUseCompoundFile(iconf.getIndexUseCompoundFile()); + maxNumSegments = iconf.getIndexMaxNumSegments(); + + if (maxFieldLength > 0) { + LOG.info("sea.max.field.length = " + writer.getMaxFieldLength()); + } + LOG.info("sea.use.compound.file = " + writer.getUseCompoundFile()); + LOG.info("sea.max.num.segments = " + maxNumSegments); + } + + // in case a previous reduce task fails, restore the generation to + // the original starting point by deleting the segments.gen file + // and the segments_N files whose generations are greater than the + // starting generation; rest of the unwanted files will be deleted + // once the unwanted segments_N files are deleted + private void restoreGeneration(FileSystem fs, Path perm, long startGen) + throws IOException { + + FileStatus[] fileStatus = fs.listStatus(perm, new PathFilter() { + public boolean accept(Path path) { + return LuceneUtil.isSegmentsFile(path.getName()); + } + }); + + // remove the segments_N files whose generation are greater than + // the starting generation + for (int i = 0; i < fileStatus.length; i++) { + Path path = fileStatus[i].getPath(); + if (startGen < LuceneUtil.generationFromSegmentsFileName(path.getName())) { + fs.delete(path, true); + } + } + + // always remove segments.gen in case last failed try removed segments_N + // but not segments.gen, and segments.gen will be overwritten anyway. + Path segmentsGenFile = new Path(LuceneUtil.IndexFileNames.SEGMENTS_GEN); + if (fs.exists(segmentsGenFile)) { + fs.delete(segmentsGenFile, true); + } + } + + // move the files created in the temp dir into the perm dir + // and then delete the temp dir from the local FS + private void moveFromTempToPerm() throws IOException { + try { + FileStatus[] fileStatus = + localFs.listStatus(temp, LuceneIndexFileNameFilter.getFilter()); + Path segmentsPath = null; + Path segmentsGenPath = null; + + // move the files created in temp dir except segments_N and segments.gen + for (int i = 0; i < fileStatus.length; i++) { + Path path = fileStatus[i].getPath(); + String name = path.getName(); + + if (LuceneUtil.isSegmentsGenFile(name)) { + assert (segmentsGenPath == null); + segmentsGenPath = path; + } else if (LuceneUtil.isSegmentsFile(name)) { + assert (segmentsPath == null); + segmentsPath = path; + } else { + fs.completeLocalOutput(new Path(perm, name), path); + } + } + + // move the segments_N file + if (segmentsPath != null) { + fs.completeLocalOutput(new Path(perm, segmentsPath.getName()), + segmentsPath); + } + + // move the segments.gen file + if (segmentsGenPath != null) { + fs.completeLocalOutput(new Path(perm, segmentsGenPath.getName()), + segmentsGenPath); + } + } finally { + // finally delete the temp dir (files should have been deleted) + localFs.delete(temp, true); + } + } + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/main/UpdateIndex.java Tue Oct 16 00:02:55 2012 @@ -1,276 +1,276 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.main; - -import java.io.IOException; -import java.text.NumberFormat; -import java.util.Arrays; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration; -import org.apache.hadoop.contrib.index.mapred.IIndexUpdater; -import org.apache.hadoop.contrib.index.mapred.Shard; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.main; + +import java.io.IOException; +import java.text.NumberFormat; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration; +import org.apache.hadoop.contrib.index.mapred.IIndexUpdater; +import org.apache.hadoop.contrib.index.mapred.Shard; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * A distributed "index" is partitioned into "shards". Each shard corresponds - * to a Lucene instance. This class contains the main() method which uses a - * Map/Reduce job to analyze documents and update Lucene instances in parallel. - * - * The main() method in UpdateIndex requires the following information for - * updating the shards: - * - Input formatter. This specifies how to format the input documents. - * - Analysis. This defines the analyzer to use on the input. The analyzer - * determines whether a document is being inserted, updated, or deleted. - * For inserts or updates, the analyzer also converts each input document - * into a Lucene document. - * - Input paths. This provides the location(s) of updated documents, - * e.g., HDFS files or directories, or HBase tables. - * - Shard paths, or index path with the number of shards. Either specify - * the path for each shard, or specify an index path and the shards are - * the sub-directories of the index directory. - * - Output path. When the update to a shard is done, a message is put here. - * - Number of map tasks. - * - * All of the information can be specified in a configuration file. All but - * the first two can also be specified as command line options. Check out - * conf/index-config.xml.template for other configurable parameters. - * - * Note: Because of the parallel nature of Map/Reduce, the behaviour of - * multiple inserts, deletes or updates to the same document is undefined. - */ -public class UpdateIndex { - public static final Log LOG = LogFactory.getLog(UpdateIndex.class); - - private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); - static { - NUMBER_FORMAT.setMinimumIntegerDigits(5); - NUMBER_FORMAT.setGroupingUsed(false); - } - - private static long now() { - return System.currentTimeMillis(); - } - - private static void printUsage(String cmd) { - System.err.println("Usage: java " + UpdateIndex.class.getName() + "\n" - + " -inputPaths \n" - + " -outputPath \n" - + " -shards \n" - + " -indexPath \n" - + " -numShards \n" - + " -numMapTasks \n" - + " -conf \n" - + "Note: Do not use both -shards option and -indexPath option."); - } - - private static String getIndexPath(Configuration conf) { - return conf.get("sea.index.path"); - } - - private static int getNumShards(Configuration conf) { - return conf.getInt("sea.num.shards", 1); - } - - private static Shard[] createShards(String indexPath, int numShards, - Configuration conf) throws IOException { - - String parent = Shard.normalizePath(indexPath) + Path.SEPARATOR; - long versionNumber = -1; - long generation = -1; - - FileSystem fs = FileSystem.get(conf); - Path path = new Path(indexPath); - - if (fs.exists(path)) { - FileStatus[] fileStatus = fs.listStatus(path); - String[] shardNames = new String[fileStatus.length]; - int count = 0; - for (int i = 0; i < fileStatus.length; i++) { - if (fileStatus[i].isDirectory()) { - shardNames[count] = fileStatus[i].getPath().getName(); - count++; - } - } - Arrays.sort(shardNames, 0, count); - - Shard[] shards = new Shard[count >= numShards ? count : numShards]; - for (int i = 0; i < count; i++) { - shards[i] = - new Shard(versionNumber, parent + shardNames[i], generation); - } - - int number = count; - for (int i = count; i < numShards; i++) { - String shardPath; - while (true) { - shardPath = parent + NUMBER_FORMAT.format(number++); - if (!fs.exists(new Path(shardPath))) { - break; - } - } - shards[i] = new Shard(versionNumber, shardPath, generation); - } - return shards; - } else { - Shard[] shards = new Shard[numShards]; - for (int i = 0; i < shards.length; i++) { - shards[i] = - new Shard(versionNumber, parent + NUMBER_FORMAT.format(i), - generation); - } - return shards; - } - } - - /** - * The main() method - * @param argv - */ - public static void main(String[] argv) { - if (argv.length == 0) { - printUsage(""); - System.exit(-1); - } - - String inputPathsString = null; - Path outputPath = null; - String shardsString = null; - String indexPath = null; - int numShards = -1; - int numMapTasks = -1; - Configuration conf = new Configuration(); - String confPath = null; - - // parse the command line - for (int i = 0; i < argv.length; i++) { // parse command line - if (argv[i].equals("-inputPaths")) { - inputPathsString = argv[++i]; - } else if (argv[i].equals("-outputPath")) { - outputPath = new Path(argv[++i]); - } else if (argv[i].equals("-shards")) { - shardsString = argv[++i]; - } else if (argv[i].equals("-indexPath")) { - indexPath = argv[++i]; - } else if (argv[i].equals("-numShards")) { - numShards = Integer.parseInt(argv[++i]); - } else if (argv[i].equals("-numMapTasks")) { - numMapTasks = Integer.parseInt(argv[++i]); - } else if (argv[i].equals("-conf")) { - // add as a local FS resource - confPath = argv[++i]; - conf.addResource(new Path(confPath)); - } else { - System.out.println("Unknown option " + argv[i] + " w/ value " - + argv[++i]); - } - } - LOG.info("inputPaths = " + inputPathsString); - LOG.info("outputPath = " + outputPath); - LOG.info("shards = " + shardsString); - LOG.info("indexPath = " + indexPath); - LOG.info("numShards = " + numShards); - LOG.info("numMapTasks= " + numMapTasks); - LOG.info("confPath = " + confPath); - - Path[] inputPaths = null; - Shard[] shards = null; - - JobConf jobConf = new JobConf(conf); - IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(jobConf); - - if (inputPathsString != null) { - jobConf.set(org.apache.hadoop.mapreduce.lib.input. - FileInputFormat.INPUT_DIR, inputPathsString); - } - inputPaths = FileInputFormat.getInputPaths(jobConf); - if (inputPaths.length == 0) { - inputPaths = null; - } - - if (outputPath == null) { +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * A distributed "index" is partitioned into "shards". Each shard corresponds + * to a Lucene instance. This class contains the main() method which uses a + * Map/Reduce job to analyze documents and update Lucene instances in parallel. + * + * The main() method in UpdateIndex requires the following information for + * updating the shards: + * - Input formatter. This specifies how to format the input documents. + * - Analysis. This defines the analyzer to use on the input. The analyzer + * determines whether a document is being inserted, updated, or deleted. + * For inserts or updates, the analyzer also converts each input document + * into a Lucene document. + * - Input paths. This provides the location(s) of updated documents, + * e.g., HDFS files or directories, or HBase tables. + * - Shard paths, or index path with the number of shards. Either specify + * the path for each shard, or specify an index path and the shards are + * the sub-directories of the index directory. + * - Output path. When the update to a shard is done, a message is put here. + * - Number of map tasks. + * + * All of the information can be specified in a configuration file. All but + * the first two can also be specified as command line options. Check out + * conf/index-config.xml.template for other configurable parameters. + * + * Note: Because of the parallel nature of Map/Reduce, the behaviour of + * multiple inserts, deletes or updates to the same document is undefined. + */ +public class UpdateIndex { + public static final Log LOG = LogFactory.getLog(UpdateIndex.class); + + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + + private static long now() { + return System.currentTimeMillis(); + } + + private static void printUsage(String cmd) { + System.err.println("Usage: java " + UpdateIndex.class.getName() + "\n" + + " -inputPaths \n" + + " -outputPath \n" + + " -shards \n" + + " -indexPath \n" + + " -numShards \n" + + " -numMapTasks \n" + + " -conf \n" + + "Note: Do not use both -shards option and -indexPath option."); + } + + private static String getIndexPath(Configuration conf) { + return conf.get("sea.index.path"); + } + + private static int getNumShards(Configuration conf) { + return conf.getInt("sea.num.shards", 1); + } + + private static Shard[] createShards(String indexPath, int numShards, + Configuration conf) throws IOException { + + String parent = Shard.normalizePath(indexPath) + Path.SEPARATOR; + long versionNumber = -1; + long generation = -1; + + FileSystem fs = FileSystem.get(conf); + Path path = new Path(indexPath); + + if (fs.exists(path)) { + FileStatus[] fileStatus = fs.listStatus(path); + String[] shardNames = new String[fileStatus.length]; + int count = 0; + for (int i = 0; i < fileStatus.length; i++) { + if (fileStatus[i].isDirectory()) { + shardNames[count] = fileStatus[i].getPath().getName(); + count++; + } + } + Arrays.sort(shardNames, 0, count); + + Shard[] shards = new Shard[count >= numShards ? count : numShards]; + for (int i = 0; i < count; i++) { + shards[i] = + new Shard(versionNumber, parent + shardNames[i], generation); + } + + int number = count; + for (int i = count; i < numShards; i++) { + String shardPath; + while (true) { + shardPath = parent + NUMBER_FORMAT.format(number++); + if (!fs.exists(new Path(shardPath))) { + break; + } + } + shards[i] = new Shard(versionNumber, shardPath, generation); + } + return shards; + } else { + Shard[] shards = new Shard[numShards]; + for (int i = 0; i < shards.length; i++) { + shards[i] = + new Shard(versionNumber, parent + NUMBER_FORMAT.format(i), + generation); + } + return shards; + } + } + + /** + * The main() method + * @param argv + */ + public static void main(String[] argv) { + if (argv.length == 0) { + printUsage(""); + System.exit(-1); + } + + String inputPathsString = null; + Path outputPath = null; + String shardsString = null; + String indexPath = null; + int numShards = -1; + int numMapTasks = -1; + Configuration conf = new Configuration(); + String confPath = null; + + // parse the command line + for (int i = 0; i < argv.length; i++) { // parse command line + if (argv[i].equals("-inputPaths")) { + inputPathsString = argv[++i]; + } else if (argv[i].equals("-outputPath")) { + outputPath = new Path(argv[++i]); + } else if (argv[i].equals("-shards")) { + shardsString = argv[++i]; + } else if (argv[i].equals("-indexPath")) { + indexPath = argv[++i]; + } else if (argv[i].equals("-numShards")) { + numShards = Integer.parseInt(argv[++i]); + } else if (argv[i].equals("-numMapTasks")) { + numMapTasks = Integer.parseInt(argv[++i]); + } else if (argv[i].equals("-conf")) { + // add as a local FS resource + confPath = argv[++i]; + conf.addResource(new Path(confPath)); + } else { + System.out.println("Unknown option " + argv[i] + " w/ value " + + argv[++i]); + } + } + LOG.info("inputPaths = " + inputPathsString); + LOG.info("outputPath = " + outputPath); + LOG.info("shards = " + shardsString); + LOG.info("indexPath = " + indexPath); + LOG.info("numShards = " + numShards); + LOG.info("numMapTasks= " + numMapTasks); + LOG.info("confPath = " + confPath); + + Path[] inputPaths = null; + Shard[] shards = null; + + JobConf jobConf = new JobConf(conf); + IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(jobConf); + + if (inputPathsString != null) { + jobConf.set(org.apache.hadoop.mapreduce.lib.input. + FileInputFormat.INPUT_DIR, inputPathsString); + } + inputPaths = FileInputFormat.getInputPaths(jobConf); + if (inputPaths.length == 0) { + inputPaths = null; + } + + if (outputPath == null) { outputPath = FileOutputFormat.getOutputPath(jobConf); - } - - if (inputPaths == null || outputPath == null) { - System.err.println("InputPaths and outputPath must be specified."); - printUsage(""); - System.exit(-1); - } - - if (shardsString != null) { - iconf.setIndexShards(shardsString); - } - shards = Shard.getIndexShards(iconf); - if (shards != null && shards.length == 0) { - shards = null; - } - - if (indexPath == null) { - indexPath = getIndexPath(conf); - } - if (numShards <= 0) { - numShards = getNumShards(conf); - } - - if (shards == null && indexPath == null) { - System.err.println("Either shards or indexPath must be specified."); - printUsage(""); - System.exit(-1); - } - - if (numMapTasks <= 0) { - numMapTasks = jobConf.getNumMapTasks(); - } - - try { - // create shards and set their directories if necessary - if (shards == null) { - shards = createShards(indexPath, numShards, conf); - } - - long startTime = now(); - try { - IIndexUpdater updater = - (IIndexUpdater) ReflectionUtils.newInstance( - iconf.getIndexUpdaterClass(), conf); - LOG.info("sea.index.updater = " - + iconf.getIndexUpdaterClass().getName()); - - updater.run(conf, inputPaths, outputPath, numMapTasks, shards); - LOG.info("Index update job is done"); - - } finally { - long elapsedTime = now() - startTime; - LOG.info("Elapsed time is " + (elapsedTime / 1000) + "s"); - System.out.println("Elapsed time is " + (elapsedTime / 1000) + "s"); - } - } catch (Exception e) { - e.printStackTrace(System.err); - } - } -} + } + + if (inputPaths == null || outputPath == null) { + System.err.println("InputPaths and outputPath must be specified."); + printUsage(""); + System.exit(-1); + } + + if (shardsString != null) { + iconf.setIndexShards(shardsString); + } + shards = Shard.getIndexShards(iconf); + if (shards != null && shards.length == 0) { + shards = null; + } + + if (indexPath == null) { + indexPath = getIndexPath(conf); + } + if (numShards <= 0) { + numShards = getNumShards(conf); + } + + if (shards == null && indexPath == null) { + System.err.println("Either shards or indexPath must be specified."); + printUsage(""); + System.exit(-1); + } + + if (numMapTasks <= 0) { + numMapTasks = jobConf.getNumMapTasks(); + } + + try { + // create shards and set their directories if necessary + if (shards == null) { + shards = createShards(indexPath, numShards, conf); + } + + long startTime = now(); + try { + IIndexUpdater updater = + (IIndexUpdater) ReflectionUtils.newInstance( + iconf.getIndexUpdaterClass(), conf); + LOG.info("sea.index.updater = " + + iconf.getIndexUpdaterClass().getName()); + + updater.run(conf, inputPaths, outputPath, numMapTasks, shards); + LOG.info("Index update job is done"); + + } finally { + long elapsedTime = now() - startTime; + LOG.info("Elapsed time is " + (elapsedTime / 1000) + "s"); + System.out.println("Elapsed time is " + (elapsedTime / 1000) + "s"); + } + } catch (Exception e) { + e.printStackTrace(System.err); + } + } +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentAndOp.java Tue Oct 16 00:02:55 2012 @@ -1,208 +1,208 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Writable; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.Term; - -/** - * This class represents an indexing operation. The operation can be an insert, - * a delete or an update. If the operation is an insert or an update, a (new) - * document must be specified. If the operation is a delete or an update, a - * delete term must be specified. - */ -public class DocumentAndOp implements Writable { - - /** - * This class represents the type of an operation - an insert, a delete or - * an update. - */ - public static final class Op { - public static final Op INSERT = new Op("INSERT"); - public static final Op DELETE = new Op("DELETE"); - public static final Op UPDATE = new Op("UPDATE"); - - private String name; - - private Op(String name) { - this.name = name; - } - - public String toString() { - return name; - } - } - - private Op op; - private Document doc; - private Term term; - - /** - * Constructor for no operation. - */ - public DocumentAndOp() { - } - - /** - * Constructor for an insert operation. - * @param op - * @param doc - */ - public DocumentAndOp(Op op, Document doc) { - assert (op == Op.INSERT); - this.op = op; - this.doc = doc; - this.term = null; - } - - /** - * Constructor for a delete operation. - * @param op - * @param term - */ - public DocumentAndOp(Op op, Term term) { - assert (op == Op.DELETE); - this.op = op; - this.doc = null; - this.term = term; - } - - /** - * Constructor for an insert, a delete or an update operation. - * @param op - * @param doc - * @param term - */ - public DocumentAndOp(Op op, Document doc, Term term) { - if (op == Op.INSERT) { - assert (doc != null); - assert (term == null); - } else if (op == Op.DELETE) { - assert (doc == null); - assert (term != null); - } else { - assert (op == Op.UPDATE); - assert (doc != null); - assert (term != null); - } - this.op = op; - this.doc = doc; - this.term = term; - } - - /** - * Set the instance to be an insert operation. - * @param doc - */ - public void setInsert(Document doc) { - this.op = Op.INSERT; - this.doc = doc; - this.term = null; - } - - /** - * Set the instance to be a delete operation. - * @param term - */ - public void setDelete(Term term) { - this.op = Op.DELETE; - this.doc = null; - this.term = term; - } - - /** - * Set the instance to be an update operation. - * @param doc - * @param term - */ - public void setUpdate(Document doc, Term term) { - this.op = Op.UPDATE; - this.doc = doc; - this.term = term; - } - - /** - * Get the type of operation. - * @return the type of the operation. - */ - public Op getOp() { - return op; - } - - /** - * Get the document. - * @return the document - */ - public Document getDocument() { - return doc; - } - - /** - * Get the term. - * @return the term - */ - public Term getTerm() { - return term; - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - public String toString() { - StringBuilder buffer = new StringBuilder(); - buffer.append(this.getClass().getName()); - buffer.append("[op="); - buffer.append(op); - buffer.append(", doc="); - if (doc != null) { - buffer.append(doc); - } else { - buffer.append("null"); - } - buffer.append(", term="); - if (term != null) { - buffer.append(term); - } else { - buffer.append("null"); - } - buffer.append("]"); - return buffer.toString(); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) - */ - public void write(DataOutput out) throws IOException { - throw new IOException(this.getClass().getName() - + ".write should never be called"); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) - */ - public void readFields(DataInput in) throws IOException { - throw new IOException(this.getClass().getName() - + ".readFields should never be called"); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.Term; + +/** + * This class represents an indexing operation. The operation can be an insert, + * a delete or an update. If the operation is an insert or an update, a (new) + * document must be specified. If the operation is a delete or an update, a + * delete term must be specified. + */ +public class DocumentAndOp implements Writable { + + /** + * This class represents the type of an operation - an insert, a delete or + * an update. + */ + public static final class Op { + public static final Op INSERT = new Op("INSERT"); + public static final Op DELETE = new Op("DELETE"); + public static final Op UPDATE = new Op("UPDATE"); + + private String name; + + private Op(String name) { + this.name = name; + } + + public String toString() { + return name; + } + } + + private Op op; + private Document doc; + private Term term; + + /** + * Constructor for no operation. + */ + public DocumentAndOp() { + } + + /** + * Constructor for an insert operation. + * @param op + * @param doc + */ + public DocumentAndOp(Op op, Document doc) { + assert (op == Op.INSERT); + this.op = op; + this.doc = doc; + this.term = null; + } + + /** + * Constructor for a delete operation. + * @param op + * @param term + */ + public DocumentAndOp(Op op, Term term) { + assert (op == Op.DELETE); + this.op = op; + this.doc = null; + this.term = term; + } + + /** + * Constructor for an insert, a delete or an update operation. + * @param op + * @param doc + * @param term + */ + public DocumentAndOp(Op op, Document doc, Term term) { + if (op == Op.INSERT) { + assert (doc != null); + assert (term == null); + } else if (op == Op.DELETE) { + assert (doc == null); + assert (term != null); + } else { + assert (op == Op.UPDATE); + assert (doc != null); + assert (term != null); + } + this.op = op; + this.doc = doc; + this.term = term; + } + + /** + * Set the instance to be an insert operation. + * @param doc + */ + public void setInsert(Document doc) { + this.op = Op.INSERT; + this.doc = doc; + this.term = null; + } + + /** + * Set the instance to be a delete operation. + * @param term + */ + public void setDelete(Term term) { + this.op = Op.DELETE; + this.doc = null; + this.term = term; + } + + /** + * Set the instance to be an update operation. + * @param doc + * @param term + */ + public void setUpdate(Document doc, Term term) { + this.op = Op.UPDATE; + this.doc = doc; + this.term = term; + } + + /** + * Get the type of operation. + * @return the type of the operation. + */ + public Op getOp() { + return op; + } + + /** + * Get the document. + * @return the document + */ + public Document getDocument() { + return doc; + } + + /** + * Get the term. + * @return the term + */ + public Term getTerm() { + return term; + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append(this.getClass().getName()); + buffer.append("[op="); + buffer.append(op); + buffer.append(", doc="); + if (doc != null) { + buffer.append(doc); + } else { + buffer.append("null"); + } + buffer.append(", term="); + if (term != null) { + buffer.append(term); + } else { + buffer.append("null"); + } + buffer.append("]"); + return buffer.toString(); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { + throw new IOException(this.getClass().getName() + + ".write should never be called"); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { + throw new IOException(this.getClass().getName() + + ".readFields should never be called"); + } +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/DocumentID.java Tue Oct 16 00:02:55 2012 @@ -1,89 +1,89 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; - -/** - * The class represents a document id, which is of type text. - */ -public class DocumentID implements WritableComparable { - private final Text docID; - - /** - * Constructor. - */ - public DocumentID() { - docID = new Text(); - } - - /** - * The text of the document id. - * @return the text - */ - public Text getText() { - return docID; - } - - /* (non-Javadoc) - * @see java.lang.Comparable#compareTo(java.lang.Object) - */ - public int compareTo(Object obj) { - if (this == obj) { - return 0; - } else { - return docID.compareTo(((DocumentID) obj).docID); - } - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - public int hashCode() { - return docID.hashCode(); - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - public String toString() { - return this.getClass().getName() + "[" + docID + "]"; - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) - */ - public void write(DataOutput out) throws IOException { - throw new IOException(this.getClass().getName() - + ".write should never be called"); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) - */ - public void readFields(DataInput in) throws IOException { - throw new IOException(this.getClass().getName() - + ".readFields should never be called"); - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +/** + * The class represents a document id, which is of type text. + */ +public class DocumentID implements WritableComparable { + private final Text docID; + + /** + * Constructor. + */ + public DocumentID() { + docID = new Text(); + } + + /** + * The text of the document id. + * @return the text + */ + public Text getText() { + return docID; + } + + /* (non-Javadoc) + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + public int compareTo(Object obj) { + if (this == obj) { + return 0; + } else { + return docID.compareTo(((DocumentID) obj).docID); + } + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + public int hashCode() { + return docID.hashCode(); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + public String toString() { + return this.getClass().getName() + "[" + docID + "]"; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { + throw new IOException(this.getClass().getName() + + ".write should never be called"); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { + throw new IOException(this.getClass().getName() + + ".readFields should never be called"); + } +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IDistributionPolicy.java Tue Oct 16 00:02:55 2012 @@ -1,50 +1,50 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -/** - * A distribution policy decides, given a document with a document id, which - * one shard the request should be sent to if the request is an insert, and - * which shard(s) the request should be sent to if the request is a delete. - */ -public interface IDistributionPolicy { - - /** - * Initialization. It must be called before any chooseShard() is called. - * @param shards - */ - void init(Shard[] shards); - - /** - * Choose a shard to send an insert request. - * @param key - * @return the index of the chosen shard - */ - int chooseShardForInsert(DocumentID key); - - /** - * Choose a shard or all shards to send a delete request. E.g. a round-robin - * distribution policy would send a delete request to all the shards. - * -1 represents all the shards. - * @param key - * @return the index of the chosen shard, -1 if all the shards are chosen - */ - int chooseShardForDelete(DocumentID key); - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +/** + * A distribution policy decides, given a document with a document id, which + * one shard the request should be sent to if the request is an insert, and + * which shard(s) the request should be sent to if the request is a delete. + */ +public interface IDistributionPolicy { + + /** + * Initialization. It must be called before any chooseShard() is called. + * @param shards + */ + void init(Shard[] shards); + + /** + * Choose a shard to send an insert request. + * @param key + * @return the index of the chosen shard + */ + int chooseShardForInsert(DocumentID key); + + /** + * Choose a shard or all shards to send a delete request. E.g. a round-robin + * distribution policy would send a delete request to all the shards. + * -1 represents all the shards. + * @param key + * @return the index of the chosen shard, -1 if all the shards are chosen + */ + int chooseShardForDelete(DocumentID key); + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IIndexUpdater.java Tue Oct 16 00:02:55 2012 @@ -1,46 +1,46 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -/** - * A class implements an index updater interface should create a Map/Reduce job - * configuration and run the Map/Reduce job to analyze documents and update - * Lucene instances in parallel. - */ -public interface IIndexUpdater { - - /** - * Create a Map/Reduce job configuration and run the Map/Reduce job to - * analyze documents and update Lucene instances in parallel. - * @param conf - * @param inputPaths - * @param outputPath - * @param numMapTasks - * @param shards - * @throws IOException - */ - void run(Configuration conf, Path[] inputPaths, Path outputPath, - int numMapTasks, Shard[] shards) throws IOException; - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * A class implements an index updater interface should create a Map/Reduce job + * configuration and run the Map/Reduce job to analyze documents and update + * Lucene instances in parallel. + */ +public interface IIndexUpdater { + + /** + * Create a Map/Reduce job configuration and run the Map/Reduce job to + * analyze documents and update Lucene instances in parallel. + * @param conf + * @param inputPaths + * @param outputPath + * @param numMapTasks + * @param shards + * @throws IOException + */ + void run(Configuration conf, Path[] inputPaths, Path outputPath, + int numMapTasks, Shard[] shards) throws IOException; + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/ILocalAnalysis.java Tue Oct 16 00:02:55 2012 @@ -1,32 +1,32 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.Mapper; - -/** - * Application specific local analysis. The output type must be (DocumentID, - * DocumentAndOp). - */ -public interface ILocalAnalysis - extends Mapper { - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Mapper; + +/** + * Application specific local analysis. The output type must be (DocumentID, + * DocumentAndOp). + */ +public interface ILocalAnalysis + extends Mapper { + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateCombiner.java Tue Oct 16 00:02:55 2012 @@ -1,111 +1,111 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * This combiner combines multiple intermediate forms into one intermediate - * form. More specifically, the input intermediate forms are a single-document - * ram index and/or a single delete term. An output intermediate form contains - * a multi-document ram index and/or multiple delete terms. - */ -public class IndexUpdateCombiner extends MapReduceBase implements - Reducer { - static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class); - - IndexUpdateConfiguration iconf; - long maxSizeInBytes; - long nearMaxSizeInBytes; - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) - */ - public void reduce(Shard key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - - String message = key.toString(); - IntermediateForm form = null; - - while (values.hasNext()) { - IntermediateForm singleDocForm = values.next(); - long formSize = form == null ? 0 : form.totalSizeInBytes(); - long singleDocFormSize = singleDocForm.totalSizeInBytes(); - - if (form != null && formSize + singleDocFormSize > maxSizeInBytes) { - closeForm(form, message); - output.collect(key, form); - form = null; - } - - if (form == null && singleDocFormSize >= nearMaxSizeInBytes) { - output.collect(key, singleDocForm); - } else { - if (form == null) { - form = createForm(message); - } - form.process(singleDocForm); - } - } - - if (form != null) { - closeForm(form, message); - output.collect(key, form); - } - } - - private IntermediateForm createForm(String message) throws IOException { - LOG.info("Construct a form writer for " + message); - IntermediateForm form = new IntermediateForm(); - form.configure(iconf); - return form; - } - - private void closeForm(IntermediateForm form, String message) - throws IOException { - form.closeWriter(); - LOG.info("Closed the form writer for " + message + ", form = " + form); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - iconf = new IndexUpdateConfiguration(job); - maxSizeInBytes = iconf.getMaxRAMSizeInBytes(); - nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#close() - */ - public void close() throws IOException { - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This combiner combines multiple intermediate forms into one intermediate + * form. More specifically, the input intermediate forms are a single-document + * ram index and/or a single delete term. An output intermediate form contains + * a multi-document ram index and/or multiple delete terms. + */ +public class IndexUpdateCombiner extends MapReduceBase implements + Reducer { + static final Log LOG = LogFactory.getLog(IndexUpdateCombiner.class); + + IndexUpdateConfiguration iconf; + long maxSizeInBytes; + long nearMaxSizeInBytes; + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) + */ + public void reduce(Shard key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + String message = key.toString(); + IntermediateForm form = null; + + while (values.hasNext()) { + IntermediateForm singleDocForm = values.next(); + long formSize = form == null ? 0 : form.totalSizeInBytes(); + long singleDocFormSize = singleDocForm.totalSizeInBytes(); + + if (form != null && formSize + singleDocFormSize > maxSizeInBytes) { + closeForm(form, message); + output.collect(key, form); + form = null; + } + + if (form == null && singleDocFormSize >= nearMaxSizeInBytes) { + output.collect(key, singleDocForm); + } else { + if (form == null) { + form = createForm(message); + } + form.process(singleDocForm); + } + } + + if (form != null) { + closeForm(form, message); + output.collect(key, form); + } + } + + private IntermediateForm createForm(String message) throws IOException { + LOG.info("Construct a form writer for " + message); + IntermediateForm form = new IntermediateForm(); + form.configure(iconf); + return form; + } + + private void closeForm(IntermediateForm form, String message) + throws IOException { + form.closeWriter(); + LOG.info("Closed the form writer for " + message + ", form = " + form); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + iconf = new IndexUpdateConfiguration(job); + maxSizeInBytes = iconf.getMaxRAMSizeInBytes(); + nearMaxSizeInBytes = maxSizeInBytes - (maxSizeInBytes >>> 3); // 7/8 of max + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#close() + */ + public void close() throws IOException { + } + +}