Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BA2C5D9EA for ; Tue, 16 Oct 2012 00:05:59 +0000 (UTC) Received: (qmail 32213 invoked by uid 500); 16 Oct 2012 00:05:59 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 32163 invoked by uid 500); 16 Oct 2012 00:05:59 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 32155 invoked by uid 99); 16 Oct 2012 00:05:59 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Oct 2012 00:05:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Oct 2012 00:05:27 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CD8712388C7C; Tue, 16 Oct 2012 00:03:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1398581 [8/9] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-... Date: Tue, 16 Oct 2012 00:03:53 -0000 To: mapreduce-commits@hadoop.apache.org From: sseth@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121016000359.CD8712388C7C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java Tue Oct 16 00:02:55 2012 @@ -1,256 +1,256 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy; -import org.apache.hadoop.contrib.index.example.LineDocInputFormat; -import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; - -/** - * This class provides the getters and the setters to a number of parameters. - * Most of the parameters are related to the index update and the rest are - * from the existing Map/Reduce parameters. - */ -public class IndexUpdateConfiguration { - final Configuration conf; - - /** - * Constructor - * @param conf - */ - public IndexUpdateConfiguration(Configuration conf) { - this.conf = conf; - } - - /** - * Get the underlying configuration object. - * @return the configuration - */ - public Configuration getConfiguration() { - return conf; - } - - // - // existing map/reduce properties - // - // public int getIOFileBufferSize() { - // return getInt("io.file.buffer.size", 4096); - // } - - /** - * Get the IO sort space in MB. - * @return the IO sort space in MB - */ - public int getIOSortMB() { - return conf.getInt(MRJobConfig.IO_SORT_MB, 100); - } - - /** - * Set the IO sort space in MB. - * @param mb the IO sort space in MB - */ - public void setIOSortMB(int mb) { - conf.setInt(MRJobConfig.IO_SORT_MB, mb); - } - - /** - * Get the Map/Reduce temp directory. - * @return the Map/Reduce temp directory - */ - public String getMapredTempDir() { - return conf.get(MRConfig.TEMP_DIR); - } - - // - // properties for index update - // - /** - * Get the distribution policy class. - * @return the distribution policy class - */ - public Class getDistributionPolicyClass() { - return conf.getClass("sea.distribution.policy", - HashingDistributionPolicy.class, IDistributionPolicy.class); - } - - /** - * Set the distribution policy class. - * @param theClass the distribution policy class - */ - public void setDistributionPolicyClass( - Class theClass) { - conf.setClass("sea.distribution.policy", theClass, - IDistributionPolicy.class); - } - - /** - * Get the analyzer class. - * @return the analyzer class - */ - public Class getDocumentAnalyzerClass() { - return conf.getClass("sea.document.analyzer", StandardAnalyzer.class, - Analyzer.class); - } - - /** - * Set the analyzer class. - * @param theClass the analyzer class - */ - public void setDocumentAnalyzerClass(Class theClass) { - conf.setClass("sea.document.analyzer", theClass, Analyzer.class); - } - - /** - * Get the index input format class. - * @return the index input format class - */ - public Class getIndexInputFormatClass() { - return conf.getClass("sea.input.format", LineDocInputFormat.class, - InputFormat.class); - } - - /** - * Set the index input format class. - * @param theClass the index input format class - */ - public void setIndexInputFormatClass(Class theClass) { - conf.setClass("sea.input.format", theClass, InputFormat.class); - } - - /** - * Get the index updater class. - * @return the index updater class - */ - public Class getIndexUpdaterClass() { - return conf.getClass("sea.index.updater", IndexUpdater.class, - IIndexUpdater.class); - } - - /** - * Set the index updater class. - * @param theClass the index updater class - */ - public void setIndexUpdaterClass(Class theClass) { - conf.setClass("sea.index.updater", theClass, IIndexUpdater.class); - } - - /** - * Get the local analysis class. - * @return the local analysis class - */ - public Class getLocalAnalysisClass() { - return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class, - ILocalAnalysis.class); - } - - /** - * Set the local analysis class. - * @param theClass the local analysis class - */ - public void setLocalAnalysisClass(Class theClass) { - conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class); - } - - /** - * Get the string representation of a number of shards. - * @return the string representation of a number of shards - */ - public String getIndexShards() { - return conf.get("sea.index.shards"); - } - - /** - * Set the string representation of a number of shards. - * @param shards the string representation of a number of shards - */ - public void setIndexShards(String shards) { - conf.set("sea.index.shards", shards); - } - - /** - * Get the max field length for a Lucene instance. - * @return the max field length for a Lucene instance - */ - public int getIndexMaxFieldLength() { - return conf.getInt("sea.max.field.length", -1); - } - - /** - * Set the max field length for a Lucene instance. - * @param maxFieldLength the max field length for a Lucene instance - */ - public void setIndexMaxFieldLength(int maxFieldLength) { - conf.setInt("sea.max.field.length", maxFieldLength); - } - - /** - * Get the max number of segments for a Lucene instance. - * @return the max number of segments for a Lucene instance - */ - public int getIndexMaxNumSegments() { - return conf.getInt("sea.max.num.segments", -1); - } - - /** - * Set the max number of segments for a Lucene instance. - * @param maxNumSegments the max number of segments for a Lucene instance - */ - public void setIndexMaxNumSegments(int maxNumSegments) { - conf.setInt("sea.max.num.segments", maxNumSegments); - } - - /** - * Check whether to use the compound file format for a Lucene instance. - * @return true if using the compound file format for a Lucene instance - */ - public boolean getIndexUseCompoundFile() { - return conf.getBoolean("sea.use.compound.file", false); - } - - /** - * Set whether use the compound file format for a Lucene instance. - * @param useCompoundFile whether to use the compound file format - */ - public void setIndexUseCompoundFile(boolean useCompoundFile) { - conf.setBoolean("sea.use.compound.file", useCompoundFile); - } - - /** - * Get the max ram index size in bytes. The default is 50M. - * @return the max ram index size in bytes - */ - public long getMaxRAMSizeInBytes() { - return conf.getLong("sea.max.ramsize.bytes", 50L << 20); - } - - /** - * Set the max ram index size in bytes. - * @param b the max ram index size in bytes - */ - public void setMaxRAMSizeInBytes(long b) { - conf.setLong("sea.max.ramsize.bytes", b); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy; +import org.apache.hadoop.contrib.index.example.LineDocInputFormat; +import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; + +/** + * This class provides the getters and the setters to a number of parameters. + * Most of the parameters are related to the index update and the rest are + * from the existing Map/Reduce parameters. + */ +public class IndexUpdateConfiguration { + final Configuration conf; + + /** + * Constructor + * @param conf + */ + public IndexUpdateConfiguration(Configuration conf) { + this.conf = conf; + } + + /** + * Get the underlying configuration object. + * @return the configuration + */ + public Configuration getConfiguration() { + return conf; + } + + // + // existing map/reduce properties + // + // public int getIOFileBufferSize() { + // return getInt("io.file.buffer.size", 4096); + // } + + /** + * Get the IO sort space in MB. + * @return the IO sort space in MB + */ + public int getIOSortMB() { + return conf.getInt(MRJobConfig.IO_SORT_MB, 100); + } + + /** + * Set the IO sort space in MB. + * @param mb the IO sort space in MB + */ + public void setIOSortMB(int mb) { + conf.setInt(MRJobConfig.IO_SORT_MB, mb); + } + + /** + * Get the Map/Reduce temp directory. + * @return the Map/Reduce temp directory + */ + public String getMapredTempDir() { + return conf.get(MRConfig.TEMP_DIR); + } + + // + // properties for index update + // + /** + * Get the distribution policy class. + * @return the distribution policy class + */ + public Class getDistributionPolicyClass() { + return conf.getClass("sea.distribution.policy", + HashingDistributionPolicy.class, IDistributionPolicy.class); + } + + /** + * Set the distribution policy class. + * @param theClass the distribution policy class + */ + public void setDistributionPolicyClass( + Class theClass) { + conf.setClass("sea.distribution.policy", theClass, + IDistributionPolicy.class); + } + + /** + * Get the analyzer class. + * @return the analyzer class + */ + public Class getDocumentAnalyzerClass() { + return conf.getClass("sea.document.analyzer", StandardAnalyzer.class, + Analyzer.class); + } + + /** + * Set the analyzer class. + * @param theClass the analyzer class + */ + public void setDocumentAnalyzerClass(Class theClass) { + conf.setClass("sea.document.analyzer", theClass, Analyzer.class); + } + + /** + * Get the index input format class. + * @return the index input format class + */ + public Class getIndexInputFormatClass() { + return conf.getClass("sea.input.format", LineDocInputFormat.class, + InputFormat.class); + } + + /** + * Set the index input format class. + * @param theClass the index input format class + */ + public void setIndexInputFormatClass(Class theClass) { + conf.setClass("sea.input.format", theClass, InputFormat.class); + } + + /** + * Get the index updater class. + * @return the index updater class + */ + public Class getIndexUpdaterClass() { + return conf.getClass("sea.index.updater", IndexUpdater.class, + IIndexUpdater.class); + } + + /** + * Set the index updater class. + * @param theClass the index updater class + */ + public void setIndexUpdaterClass(Class theClass) { + conf.setClass("sea.index.updater", theClass, IIndexUpdater.class); + } + + /** + * Get the local analysis class. + * @return the local analysis class + */ + public Class getLocalAnalysisClass() { + return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class, + ILocalAnalysis.class); + } + + /** + * Set the local analysis class. + * @param theClass the local analysis class + */ + public void setLocalAnalysisClass(Class theClass) { + conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class); + } + + /** + * Get the string representation of a number of shards. + * @return the string representation of a number of shards + */ + public String getIndexShards() { + return conf.get("sea.index.shards"); + } + + /** + * Set the string representation of a number of shards. + * @param shards the string representation of a number of shards + */ + public void setIndexShards(String shards) { + conf.set("sea.index.shards", shards); + } + + /** + * Get the max field length for a Lucene instance. + * @return the max field length for a Lucene instance + */ + public int getIndexMaxFieldLength() { + return conf.getInt("sea.max.field.length", -1); + } + + /** + * Set the max field length for a Lucene instance. + * @param maxFieldLength the max field length for a Lucene instance + */ + public void setIndexMaxFieldLength(int maxFieldLength) { + conf.setInt("sea.max.field.length", maxFieldLength); + } + + /** + * Get the max number of segments for a Lucene instance. + * @return the max number of segments for a Lucene instance + */ + public int getIndexMaxNumSegments() { + return conf.getInt("sea.max.num.segments", -1); + } + + /** + * Set the max number of segments for a Lucene instance. + * @param maxNumSegments the max number of segments for a Lucene instance + */ + public void setIndexMaxNumSegments(int maxNumSegments) { + conf.setInt("sea.max.num.segments", maxNumSegments); + } + + /** + * Check whether to use the compound file format for a Lucene instance. + * @return true if using the compound file format for a Lucene instance + */ + public boolean getIndexUseCompoundFile() { + return conf.getBoolean("sea.use.compound.file", false); + } + + /** + * Set whether use the compound file format for a Lucene instance. + * @param useCompoundFile whether to use the compound file format + */ + public void setIndexUseCompoundFile(boolean useCompoundFile) { + conf.setBoolean("sea.use.compound.file", useCompoundFile); + } + + /** + * Get the max ram index size in bytes. The default is 50M. + * @return the max ram index size in bytes + */ + public long getMaxRAMSizeInBytes() { + return conf.getLong("sea.max.ramsize.bytes", 50L << 20); + } + + /** + * Set the max ram index size in bytes. + * @param b the max ram index size in bytes + */ + public void setMaxRAMSizeInBytes(long b) { + conf.setLong("sea.max.ramsize.bytes", b); + } + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java Tue Oct 16 00:02:55 2012 @@ -1,199 +1,199 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.lucene.analysis.Analyzer; - -/** - * This class applies local analysis on a key-value pair and then convert the - * result docid-operation pair to a shard-and-intermediate form pair. - */ -public class IndexUpdateMapper - extends MapReduceBase implements Mapper { - static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class); - - /** - * Get the map output key class. - * @return the map output key class - */ - public static Class getMapOutputKeyClass() { - return Shard.class; - } - - /** - * Get the map output value class. - * @return the map output value class - */ - public static Class getMapOutputValueClass() { - return IntermediateForm.class; - } - - IndexUpdateConfiguration iconf; - private Analyzer analyzer; - private Shard[] shards; - private IDistributionPolicy distributionPolicy; - - private ILocalAnalysis localAnalysis; - private DocumentID tmpKey; - private DocumentAndOp tmpValue; - - private OutputCollector tmpCollector = - new OutputCollector() { - public void collect(DocumentID key, DocumentAndOp value) - throws IOException { - tmpKey = key; - tmpValue = value; - } - }; - - /** - * Map a key-value pair to a shard-and-intermediate form pair. Internally, - * the local analysis is first applied to map the key-value pair to a - * document id-and-operation pair, then the docid-and-operation pair is - * mapped to a shard-intermediate form pair. The intermediate form is of the - * form of a single-document ram index and/or a single delete term. - */ - public void map(K key, V value, - OutputCollector output, Reporter reporter) - throws IOException { - - synchronized (this) { - localAnalysis.map(key, value, tmpCollector, reporter); - - if (tmpKey != null && tmpValue != null) { - DocumentAndOp doc = tmpValue; - IntermediateForm form = new IntermediateForm(); - form.configure(iconf); - form.process(doc, analyzer); - form.closeWriter(); - - if (doc.getOp() == DocumentAndOp.Op.INSERT) { - int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey); - if (chosenShard >= 0) { - // insert into one shard - output.collect(shards[chosenShard], form); - } else { - throw new IOException("Chosen shard for insert must be >= 0"); - } - - } else if (doc.getOp() == DocumentAndOp.Op.DELETE) { - int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey); - if (chosenShard >= 0) { - // delete from one shard - output.collect(shards[chosenShard], form); - } else { - // broadcast delete to all shards - for (int i = 0; i < shards.length; i++) { - output.collect(shards[i], form); - } - } - - } else { // UPDATE - int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey); - int deleteFromShard = - distributionPolicy.chooseShardForDelete(tmpKey); - - if (insertToShard >= 0) { - if (insertToShard == deleteFromShard) { - // update into one shard - output.collect(shards[insertToShard], form); - } else { - // prepare a deletion form - IntermediateForm deletionForm = new IntermediateForm(); - deletionForm.configure(iconf); - deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE, - doc.getTerm()), analyzer); - deletionForm.closeWriter(); - - if (deleteFromShard >= 0) { - // delete from one shard - output.collect(shards[deleteFromShard], deletionForm); - } else { - // broadcast delete to all shards - for (int i = 0; i < shards.length; i++) { - output.collect(shards[i], deletionForm); - } - } - - // prepare an insertion form - IntermediateForm insertionForm = new IntermediateForm(); - insertionForm.configure(iconf); - insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT, - doc.getDocument()), analyzer); - insertionForm.closeWriter(); - - // insert into one shard - output.collect(shards[insertToShard], insertionForm); - } - } else { - throw new IOException("Chosen shard for insert must be >= 0"); - } - } - } - } - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - iconf = new IndexUpdateConfiguration(job); - analyzer = - (Analyzer) ReflectionUtils.newInstance( - iconf.getDocumentAnalyzerClass(), job); - - localAnalysis = - (ILocalAnalysis) ReflectionUtils.newInstance( - iconf.getLocalAnalysisClass(), job); - localAnalysis.configure(job); - - shards = Shard.getIndexShards(iconf); - - distributionPolicy = - (IDistributionPolicy) ReflectionUtils.newInstance( - iconf.getDistributionPolicyClass(), job); - distributionPolicy.init(shards); - - LOG.info("sea.document.analyzer = " + analyzer.getClass().getName()); - LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName()); - LOG.info(shards.length + " shards = " + iconf.getIndexShards()); - LOG.info("sea.distribution.policy = " - + distributionPolicy.getClass().getName()); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#close() - */ - public void close() throws IOException { - localAnalysis.close(); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.lucene.analysis.Analyzer; + +/** + * This class applies local analysis on a key-value pair and then convert the + * result docid-operation pair to a shard-and-intermediate form pair. + */ +public class IndexUpdateMapper + extends MapReduceBase implements Mapper { + static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class); + + /** + * Get the map output key class. + * @return the map output key class + */ + public static Class getMapOutputKeyClass() { + return Shard.class; + } + + /** + * Get the map output value class. + * @return the map output value class + */ + public static Class getMapOutputValueClass() { + return IntermediateForm.class; + } + + IndexUpdateConfiguration iconf; + private Analyzer analyzer; + private Shard[] shards; + private IDistributionPolicy distributionPolicy; + + private ILocalAnalysis localAnalysis; + private DocumentID tmpKey; + private DocumentAndOp tmpValue; + + private OutputCollector tmpCollector = + new OutputCollector() { + public void collect(DocumentID key, DocumentAndOp value) + throws IOException { + tmpKey = key; + tmpValue = value; + } + }; + + /** + * Map a key-value pair to a shard-and-intermediate form pair. Internally, + * the local analysis is first applied to map the key-value pair to a + * document id-and-operation pair, then the docid-and-operation pair is + * mapped to a shard-intermediate form pair. The intermediate form is of the + * form of a single-document ram index and/or a single delete term. + */ + public void map(K key, V value, + OutputCollector output, Reporter reporter) + throws IOException { + + synchronized (this) { + localAnalysis.map(key, value, tmpCollector, reporter); + + if (tmpKey != null && tmpValue != null) { + DocumentAndOp doc = tmpValue; + IntermediateForm form = new IntermediateForm(); + form.configure(iconf); + form.process(doc, analyzer); + form.closeWriter(); + + if (doc.getOp() == DocumentAndOp.Op.INSERT) { + int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey); + if (chosenShard >= 0) { + // insert into one shard + output.collect(shards[chosenShard], form); + } else { + throw new IOException("Chosen shard for insert must be >= 0"); + } + + } else if (doc.getOp() == DocumentAndOp.Op.DELETE) { + int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey); + if (chosenShard >= 0) { + // delete from one shard + output.collect(shards[chosenShard], form); + } else { + // broadcast delete to all shards + for (int i = 0; i < shards.length; i++) { + output.collect(shards[i], form); + } + } + + } else { // UPDATE + int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey); + int deleteFromShard = + distributionPolicy.chooseShardForDelete(tmpKey); + + if (insertToShard >= 0) { + if (insertToShard == deleteFromShard) { + // update into one shard + output.collect(shards[insertToShard], form); + } else { + // prepare a deletion form + IntermediateForm deletionForm = new IntermediateForm(); + deletionForm.configure(iconf); + deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE, + doc.getTerm()), analyzer); + deletionForm.closeWriter(); + + if (deleteFromShard >= 0) { + // delete from one shard + output.collect(shards[deleteFromShard], deletionForm); + } else { + // broadcast delete to all shards + for (int i = 0; i < shards.length; i++) { + output.collect(shards[i], deletionForm); + } + } + + // prepare an insertion form + IntermediateForm insertionForm = new IntermediateForm(); + insertionForm.configure(iconf); + insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT, + doc.getDocument()), analyzer); + insertionForm.closeWriter(); + + // insert into one shard + output.collect(shards[insertToShard], insertionForm); + } + } else { + throw new IOException("Chosen shard for insert must be >= 0"); + } + } + } + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + iconf = new IndexUpdateConfiguration(job); + analyzer = + (Analyzer) ReflectionUtils.newInstance( + iconf.getDocumentAnalyzerClass(), job); + + localAnalysis = + (ILocalAnalysis) ReflectionUtils.newInstance( + iconf.getLocalAnalysisClass(), job); + localAnalysis.configure(job); + + shards = Shard.getIndexShards(iconf); + + distributionPolicy = + (IDistributionPolicy) ReflectionUtils.newInstance( + iconf.getDistributionPolicyClass(), job); + distributionPolicy.init(shards); + + LOG.info("sea.document.analyzer = " + analyzer.getClass().getName()); + LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName()); + LOG.info(shards.length + " shards = " + iconf.getIndexShards()); + LOG.info("sea.distribution.policy = " + + distributionPolicy.getClass().getName()); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#close() + */ + public void close() throws IOException { + localAnalysis.close(); + } + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java Tue Oct 16 00:02:55 2012 @@ -1,60 +1,60 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; - -/** - * This partitioner class puts the values of the same key - in this case the - * same shard - in the same partition. - */ -public class IndexUpdatePartitioner implements - Partitioner { - - private Shard[] shards; - private Map map; - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int) - */ - public int getPartition(Shard key, IntermediateForm value, int numPartitions) { - int partition = map.get(key).intValue(); - if (partition < numPartitions) { - return partition; - } else { - return numPartitions - 1; - } - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - shards = Shard.getIndexShards(new IndexUpdateConfiguration(job)); - map = new HashMap(); - for (int i = 0; i < shards.length; i++) { - map.put(shards[i], i); - } - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; + +/** + * This partitioner class puts the values of the same key - in this case the + * same shard - in the same partition. + */ +public class IndexUpdatePartitioner implements + Partitioner { + + private Shard[] shards; + private Map map; + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int) + */ + public int getPartition(Shard key, IntermediateForm value, int numPartitions) { + int partition = map.get(key).intValue(); + if (partition < numPartitions) { + return partition; + } else { + return numPartitions - 1; + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + shards = Shard.getIndexShards(new IndexUpdateConfiguration(job)); + map = new HashMap(); + for (int i = 0; i < shards.length; i++) { + map.put(shards[i], i); + } + } + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java Tue Oct 16 00:02:55 2012 @@ -1,143 +1,143 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.contrib.index.lucene.ShardWriter; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Closeable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * This reducer applies to a shard the changes for it. A "new version" of - * a shard is created at the end of a reduce. It is important to note that - * the new version of the shard is not derived from scratch. By leveraging - * Lucene's update algorithm, the new version of each Lucene instance will - * share as many files as possible as the previous version. - */ -public class IndexUpdateReducer extends MapReduceBase implements - Reducer { - static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class); - static final Text DONE = new Text("done"); - - /** - * Get the reduce output key class. - * @return the reduce output key class - */ - public static Class getOutputKeyClass() { - return Shard.class; - } - - /** - * Get the reduce output value class. - * @return the reduce output value class - */ - public static Class getOutputValueClass() { - return Text.class; - } - - private IndexUpdateConfiguration iconf; - private String mapredTempDir; - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) - */ - public void reduce(Shard key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - - LOG.info("Construct a shard writer for " + key); - FileSystem fs = FileSystem.get(iconf.getConfiguration()); - String temp = - mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis(); - final ShardWriter writer = new ShardWriter(fs, key, temp, iconf); - - // update the shard - while (values.hasNext()) { - IntermediateForm form = values.next(); - writer.process(form); - reporter.progress(); - } - - // close the shard - final Reporter fReporter = reporter; - new Closeable() { - volatile boolean closed = false; - - public void close() throws IOException { - // spawn a thread to give progress heartbeats - Thread prog = new Thread() { - public void run() { - while (!closed) { - try { - fReporter.setStatus("closing"); - Thread.sleep(1000); - } catch (InterruptedException e) { - continue; - } catch (Throwable e) { - return; - } - } - } - }; - - try { - prog.start(); - - if (writer != null) { - writer.close(); - } - } finally { - closed = true; - } - } - }.close(); - LOG.info("Closed the shard writer for " + key + ", writer = " + writer); - - output.collect(key, DONE); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) - */ - public void configure(JobConf job) { - iconf = new IndexUpdateConfiguration(job); - mapredTempDir = iconf.getMapredTempDir(); - mapredTempDir = Shard.normalizePath(mapredTempDir); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.mapred.MapReduceBase#close() - */ - public void close() throws IOException { - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.contrib.index.lucene.ShardWriter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Closeable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This reducer applies to a shard the changes for it. A "new version" of + * a shard is created at the end of a reduce. It is important to note that + * the new version of the shard is not derived from scratch. By leveraging + * Lucene's update algorithm, the new version of each Lucene instance will + * share as many files as possible as the previous version. + */ +public class IndexUpdateReducer extends MapReduceBase implements + Reducer { + static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class); + static final Text DONE = new Text("done"); + + /** + * Get the reduce output key class. + * @return the reduce output key class + */ + public static Class getOutputKeyClass() { + return Shard.class; + } + + /** + * Get the reduce output value class. + * @return the reduce output value class + */ + public static Class getOutputValueClass() { + return Text.class; + } + + private IndexUpdateConfiguration iconf; + private String mapredTempDir; + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) + */ + public void reduce(Shard key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + + LOG.info("Construct a shard writer for " + key); + FileSystem fs = FileSystem.get(iconf.getConfiguration()); + String temp = + mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis(); + final ShardWriter writer = new ShardWriter(fs, key, temp, iconf); + + // update the shard + while (values.hasNext()) { + IntermediateForm form = values.next(); + writer.process(form); + reporter.progress(); + } + + // close the shard + final Reporter fReporter = reporter; + new Closeable() { + volatile boolean closed = false; + + public void close() throws IOException { + // spawn a thread to give progress heartbeats + Thread prog = new Thread() { + public void run() { + while (!closed) { + try { + fReporter.setStatus("closing"); + Thread.sleep(1000); + } catch (InterruptedException e) { + continue; + } catch (Throwable e) { + return; + } + } + } + }; + + try { + prog.start(); + + if (writer != null) { + writer.close(); + } + } finally { + closed = true; + } + } + }.close(); + LOG.info("Closed the shard writer for " + key + ", writer = " + writer); + + output.collect(key, DONE); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf) + */ + public void configure(JobConf job) { + iconf = new IndexUpdateConfiguration(job); + mapredTempDir = iconf.getMapredTempDir(); + mapredTempDir = Shard.normalizePath(mapredTempDir); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.mapred.MapReduceBase#close() + */ + public void close() throws IOException { + } + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java Tue Oct 16 00:02:55 2012 @@ -1,252 +1,252 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.Term; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.RAMDirectory; - -/** - * An intermediate form for one or more parsed Lucene documents and/or - * delete terms. It actually uses Lucene file format as the format for - * the intermediate form by using RAM dir files. - * - * Note: If process(*) is ever called, closeWriter() should be called. - * Otherwise, no need to call closeWriter(). - */ -public class IntermediateForm implements Writable { - - private IndexUpdateConfiguration iconf = null; - private final Collection deleteList; - private RAMDirectory dir; - private IndexWriter writer; - private int numDocs; - - /** - * Constructor - * @throws IOException - */ - public IntermediateForm() throws IOException { - deleteList = new ConcurrentLinkedQueue(); - dir = new RAMDirectory(); - writer = null; - numDocs = 0; - } - - /** - * Configure using an index update configuration. - * @param iconf the index update configuration - */ - public void configure(IndexUpdateConfiguration iconf) { - this.iconf = iconf; - } - - /** - * Get the ram directory of the intermediate form. - * @return the ram directory - */ - public Directory getDirectory() { - return dir; - } - - /** - * Get an iterator for the delete terms in the intermediate form. - * @return an iterator for the delete terms - */ - public Iterator deleteTermIterator() { - return deleteList.iterator(); - } - - /** - * This method is used by the index update mapper and process a document - * operation into the current intermediate form. - * @param doc input document operation - * @param analyzer the analyzer - * @throws IOException - */ - public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException { - if (doc.getOp() == DocumentAndOp.Op.DELETE - || doc.getOp() == DocumentAndOp.Op.UPDATE) { - deleteList.add(doc.getTerm()); - - } - - if (doc.getOp() == DocumentAndOp.Op.INSERT - || doc.getOp() == DocumentAndOp.Op.UPDATE) { - - if (writer == null) { - // analyzer is null because we specify an analyzer with addDocument - writer = createWriter(); - } - - writer.addDocument(doc.getDocument(), analyzer); - numDocs++; - } - - } - - /** - * This method is used by the index update combiner and process an - * intermediate form into the current intermediate form. More specifically, - * the input intermediate forms are a single-document ram index and/or a - * single delete term. - * @param form the input intermediate form - * @throws IOException - */ - public void process(IntermediateForm form) throws IOException { - if (form.deleteList.size() > 0) { - deleteList.addAll(form.deleteList); - } - - if (form.dir.sizeInBytes() > 0) { - if (writer == null) { - writer = createWriter(); - } - - writer.addIndexesNoOptimize(new Directory[] { form.dir }); - numDocs++; - } - - } - - /** - * Close the Lucene index writer associated with the intermediate form, - * if created. Do not close the ram directory. In fact, there is no need - * to close a ram directory. - * @throws IOException - */ - public void closeWriter() throws IOException { - if (writer != null) { - writer.close(); - writer = null; - } - } - - /** - * The total size of files in the directory and ram used by the index writer. - * It does not include memory used by the delete list. - * @return the total size in bytes - */ - public long totalSizeInBytes() throws IOException { - long size = dir.sizeInBytes(); - if (writer != null) { - size += writer.ramSizeInBytes(); - } - return size; - } - - /* (non-Javadoc) - * @see java.lang.Object#toString() - */ - public String toString() { - StringBuilder buffer = new StringBuilder(); - buffer.append(this.getClass().getSimpleName()); - buffer.append("[numDocs="); - buffer.append(numDocs); - buffer.append(", numDeletes="); - buffer.append(deleteList.size()); - if (deleteList.size() > 0) { - buffer.append("("); - Iterator iter = deleteTermIterator(); - while (iter.hasNext()) { - buffer.append(iter.next()); - buffer.append(" "); - } - buffer.append(")"); - } - buffer.append("]"); - return buffer.toString(); - } - - private IndexWriter createWriter() throws IOException { - IndexWriter writer = - new IndexWriter(dir, false, null, - new KeepOnlyLastCommitDeletionPolicy()); - writer.setUseCompoundFile(false); - - if (iconf != null) { - int maxFieldLength = iconf.getIndexMaxFieldLength(); - if (maxFieldLength > 0) { - writer.setMaxFieldLength(maxFieldLength); - } - } - - return writer; - } - - private void resetForm() throws IOException { - deleteList.clear(); - if (dir.sizeInBytes() > 0) { - // it's ok if we don't close a ram directory - dir.close(); - // an alternative is to delete all the files and reuse the ram directory - dir = new RAMDirectory(); - } - assert (writer == null); - numDocs = 0; - } - - // /////////////////////////////////// - // Writable - // /////////////////////////////////// - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) - */ - public void write(DataOutput out) throws IOException { - out.writeInt(deleteList.size()); - for (Term term : deleteList) { - Text.writeString(out, term.field()); - Text.writeString(out, term.text()); - } - - String[] files = dir.list(); - RAMDirectoryUtil.writeRAMFiles(out, dir, files); - } - - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) - */ - public void readFields(DataInput in) throws IOException { - resetForm(); - - int numDeleteTerms = in.readInt(); - for (int i = 0; i < numDeleteTerms; i++) { - String field = Text.readString(in); - String text = Text.readString(in); - deleteList.add(new Term(field, text)); - } - - RAMDirectoryUtil.readRAMFiles(in, dir); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; + +/** + * An intermediate form for one or more parsed Lucene documents and/or + * delete terms. It actually uses Lucene file format as the format for + * the intermediate form by using RAM dir files. + * + * Note: If process(*) is ever called, closeWriter() should be called. + * Otherwise, no need to call closeWriter(). + */ +public class IntermediateForm implements Writable { + + private IndexUpdateConfiguration iconf = null; + private final Collection deleteList; + private RAMDirectory dir; + private IndexWriter writer; + private int numDocs; + + /** + * Constructor + * @throws IOException + */ + public IntermediateForm() throws IOException { + deleteList = new ConcurrentLinkedQueue(); + dir = new RAMDirectory(); + writer = null; + numDocs = 0; + } + + /** + * Configure using an index update configuration. + * @param iconf the index update configuration + */ + public void configure(IndexUpdateConfiguration iconf) { + this.iconf = iconf; + } + + /** + * Get the ram directory of the intermediate form. + * @return the ram directory + */ + public Directory getDirectory() { + return dir; + } + + /** + * Get an iterator for the delete terms in the intermediate form. + * @return an iterator for the delete terms + */ + public Iterator deleteTermIterator() { + return deleteList.iterator(); + } + + /** + * This method is used by the index update mapper and process a document + * operation into the current intermediate form. + * @param doc input document operation + * @param analyzer the analyzer + * @throws IOException + */ + public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException { + if (doc.getOp() == DocumentAndOp.Op.DELETE + || doc.getOp() == DocumentAndOp.Op.UPDATE) { + deleteList.add(doc.getTerm()); + + } + + if (doc.getOp() == DocumentAndOp.Op.INSERT + || doc.getOp() == DocumentAndOp.Op.UPDATE) { + + if (writer == null) { + // analyzer is null because we specify an analyzer with addDocument + writer = createWriter(); + } + + writer.addDocument(doc.getDocument(), analyzer); + numDocs++; + } + + } + + /** + * This method is used by the index update combiner and process an + * intermediate form into the current intermediate form. More specifically, + * the input intermediate forms are a single-document ram index and/or a + * single delete term. + * @param form the input intermediate form + * @throws IOException + */ + public void process(IntermediateForm form) throws IOException { + if (form.deleteList.size() > 0) { + deleteList.addAll(form.deleteList); + } + + if (form.dir.sizeInBytes() > 0) { + if (writer == null) { + writer = createWriter(); + } + + writer.addIndexesNoOptimize(new Directory[] { form.dir }); + numDocs++; + } + + } + + /** + * Close the Lucene index writer associated with the intermediate form, + * if created. Do not close the ram directory. In fact, there is no need + * to close a ram directory. + * @throws IOException + */ + public void closeWriter() throws IOException { + if (writer != null) { + writer.close(); + writer = null; + } + } + + /** + * The total size of files in the directory and ram used by the index writer. + * It does not include memory used by the delete list. + * @return the total size in bytes + */ + public long totalSizeInBytes() throws IOException { + long size = dir.sizeInBytes(); + if (writer != null) { + size += writer.ramSizeInBytes(); + } + return size; + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append(this.getClass().getSimpleName()); + buffer.append("[numDocs="); + buffer.append(numDocs); + buffer.append(", numDeletes="); + buffer.append(deleteList.size()); + if (deleteList.size() > 0) { + buffer.append("("); + Iterator iter = deleteTermIterator(); + while (iter.hasNext()) { + buffer.append(iter.next()); + buffer.append(" "); + } + buffer.append(")"); + } + buffer.append("]"); + return buffer.toString(); + } + + private IndexWriter createWriter() throws IOException { + IndexWriter writer = + new IndexWriter(dir, false, null, + new KeepOnlyLastCommitDeletionPolicy()); + writer.setUseCompoundFile(false); + + if (iconf != null) { + int maxFieldLength = iconf.getIndexMaxFieldLength(); + if (maxFieldLength > 0) { + writer.setMaxFieldLength(maxFieldLength); + } + } + + return writer; + } + + private void resetForm() throws IOException { + deleteList.clear(); + if (dir.sizeInBytes() > 0) { + // it's ok if we don't close a ram directory + dir.close(); + // an alternative is to delete all the files and reuse the ram directory + dir = new RAMDirectory(); + } + assert (writer == null); + numDocs = 0; + } + + // /////////////////////////////////// + // Writable + // /////////////////////////////////// + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { + out.writeInt(deleteList.size()); + for (Term term : deleteList) { + Text.writeString(out, term.field()); + Text.writeString(out, term.text()); + } + + String[] files = dir.list(); + RAMDirectoryUtil.writeRAMFiles(out, dir, files); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { + resetForm(); + + int numDeleteTerms = in.readInt(); + for (int i = 0; i < numDeleteTerms; i++) { + String field = Text.readString(in); + String text = Text.readString(in); + deleteList.add(new Term(field, text)); + } + + RAMDirectoryUtil.readRAMFiles(in, dir); + } + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java Tue Oct 16 00:02:55 2012 @@ -1,105 +1,105 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.lucene; - -import java.io.IOException; - -import junit.framework.TestCase; - -import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field; -import org.apache.lucene.index.IndexDeletionPolicy; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.Hits; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.RAMDirectory; - -public class TestMixedDirectory extends TestCase { - private int numDocsPerUpdate = 10; - private int maxBufferedDocs = 2; - - public void testMixedDirectoryAndPolicy() throws IOException { - Directory readDir = new RAMDirectory(); - updateIndex(readDir, 0, numDocsPerUpdate, - new KeepOnlyLastCommitDeletionPolicy()); - - verify(readDir, numDocsPerUpdate); - - IndexOutput out = - readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2) - + ".cfs"); - out.writeInt(0); - out.close(); - - Directory writeDir = new RAMDirectory(); - Directory mixedDir = new MixedDirectory(readDir, writeDir); - updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate, - new MixedDeletionPolicy()); - - verify(readDir, numDocsPerUpdate); - verify(mixedDir, 2 * numDocsPerUpdate); - } - - public void updateIndex(Directory dir, int base, int numDocs, - IndexDeletionPolicy policy) throws IOException { - IndexWriter writer = - new IndexWriter(dir, false, new StandardAnalyzer(), policy); - writer.setMaxBufferedDocs(maxBufferedDocs); - writer.setMergeFactor(1000); - for (int i = 0; i < numDocs; i++) { - addDoc(writer, base + i); - } - writer.close(); - } - - private void addDoc(IndexWriter writer, int id) throws IOException { - Document doc = new Document(); - doc.add(new Field("id", String.valueOf(id), Field.Store.YES, - Field.Index.UN_TOKENIZED)); - doc.add(new Field("content", "apache", Field.Store.NO, - Field.Index.TOKENIZED)); - writer.addDocument(doc); - } - - private void verify(Directory dir, int expectedHits) throws IOException { - IndexSearcher searcher = new IndexSearcher(dir); - Hits hits = searcher.search(new TermQuery(new Term("content", "apache"))); - int numHits = hits.length(); - - assertEquals(expectedHits, numHits); - - int[] docs = new int[numHits]; - for (int i = 0; i < numHits; i++) { - Document hit = hits.doc(i); - docs[Integer.parseInt(hit.get("id"))]++; - } - for (int i = 0; i < numHits; i++) { - assertEquals(1, docs[i]); - } - - searcher.close(); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.lucene; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.Hits; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RAMDirectory; + +public class TestMixedDirectory extends TestCase { + private int numDocsPerUpdate = 10; + private int maxBufferedDocs = 2; + + public void testMixedDirectoryAndPolicy() throws IOException { + Directory readDir = new RAMDirectory(); + updateIndex(readDir, 0, numDocsPerUpdate, + new KeepOnlyLastCommitDeletionPolicy()); + + verify(readDir, numDocsPerUpdate); + + IndexOutput out = + readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2) + + ".cfs"); + out.writeInt(0); + out.close(); + + Directory writeDir = new RAMDirectory(); + Directory mixedDir = new MixedDirectory(readDir, writeDir); + updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate, + new MixedDeletionPolicy()); + + verify(readDir, numDocsPerUpdate); + verify(mixedDir, 2 * numDocsPerUpdate); + } + + public void updateIndex(Directory dir, int base, int numDocs, + IndexDeletionPolicy policy) throws IOException { + IndexWriter writer = + new IndexWriter(dir, false, new StandardAnalyzer(), policy); + writer.setMaxBufferedDocs(maxBufferedDocs); + writer.setMergeFactor(1000); + for (int i = 0; i < numDocs; i++) { + addDoc(writer, base + i); + } + writer.close(); + } + + private void addDoc(IndexWriter writer, int id) throws IOException { + Document doc = new Document(); + doc.add(new Field("id", String.valueOf(id), Field.Store.YES, + Field.Index.UN_TOKENIZED)); + doc.add(new Field("content", "apache", Field.Store.NO, + Field.Index.TOKENIZED)); + writer.addDocument(doc); + } + + private void verify(Directory dir, int expectedHits) throws IOException { + IndexSearcher searcher = new IndexSearcher(dir); + Hits hits = searcher.search(new TermQuery(new Term("content", "apache"))); + int numHits = hits.length(); + + assertEquals(expectedHits, numHits); + + int[] docs = new int[numHits]; + for (int i = 0; i < numHits; i++) { + Document hit = hits.doc(i); + docs[Integer.parseInt(hit.get("id"))]++; + } + for (int i = 0; i < numHits; i++) { + assertEquals(1, docs[i]); + } + + searcher.close(); + } + +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java?rev=1398581&r1=1398580&r2=1398581&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java Tue Oct 16 00:02:55 2012 @@ -1,234 +1,234 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.contrib.index.mapred; - -import java.io.File; -import java.io.IOException; -import java.text.NumberFormat; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy; -import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy; -import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.File; +import java.io.IOException; +import java.text.NumberFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy; +import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy; +import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.lucene.document.Document; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.MultiReader; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.Hits; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.store.Directory; - -import junit.framework.TestCase; - -public class TestDistributionPolicy extends TestCase { - - private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); - static { - NUMBER_FORMAT.setMinimumIntegerDigits(5); - NUMBER_FORMAT.setGroupingUsed(false); - } - - // however, "we only allow 0 or 1 reducer in local mode" - from - // LocalJobRunner - private Configuration conf; - private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt"); - private Path localUpdatePath = - new Path(System.getProperty("build.test") + "/sample/data2.txt"); - private Path inputPath = new Path("/myexample/data.txt"); - private Path updatePath = new Path("/myexample/data2.txt"); - private Path outputPath = new Path("/myoutput"); - private Path indexPath = new Path("/myindex"); - private int numShards = 3; - private int numMapTasks = 5; - - private int numDataNodes = 3; - private int numTaskTrackers = 3; - - private int numDocsPerRun = 10; // num of docs in local input path - - private FileSystem fs; - private MiniDFSCluster dfsCluster; - private MiniMRCluster mrCluster; - - public TestDistributionPolicy() throws IOException { - super(); - if (System.getProperty("hadoop.log.dir") == null) { - String base = new File(".").getPath(); // getAbsolutePath(); - System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs"); - } - conf = new Configuration(); - } - - protected void setUp() throws Exception { - super.setUp(); - try { - dfsCluster = - new MiniDFSCluster(conf, numDataNodes, true, (String[]) null); - - fs = dfsCluster.getFileSystem(); - if (fs.exists(inputPath)) { - fs.delete(inputPath, true); - } - fs.copyFromLocalFile(localInputPath, inputPath); - if (fs.exists(updatePath)) { - fs.delete(updatePath, true); - } - fs.copyFromLocalFile(localUpdatePath, updatePath); - - if (fs.exists(outputPath)) { - // do not create, mapred will create - fs.delete(outputPath, true); - } - - if (fs.exists(indexPath)) { - fs.delete(indexPath, true); - } - - mrCluster = - new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1); - - } catch (IOException e) { - if (dfsCluster != null) { - dfsCluster.shutdown(); - dfsCluster = null; - } - - if (fs != null) { - fs.close(); - fs = null; - } - - if (mrCluster != null) { - mrCluster.shutdown(); - mrCluster = null; - } - - throw e; - } - - } - - protected void tearDown() throws Exception { - if (dfsCluster != null) { - dfsCluster.shutdown(); - dfsCluster = null; - } - - if (fs != null) { - fs.close(); - fs = null; - } - - if (mrCluster != null) { - mrCluster.shutdown(); - mrCluster = null; - } - - super.tearDown(); - } - - public void testDistributionPolicy() throws IOException { - IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf); - - // test hashing distribution policy - iconf.setDistributionPolicyClass(HashingDistributionPolicy.class); - onetest(); - - if (fs.exists(indexPath)) { - fs.delete(indexPath, true); - } - - // test round-robin distribution policy - iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class); - onetest(); - } - - private void onetest() throws IOException { - long versionNumber = -1; - long generation = -1; - - Shard[] shards = new Shard[numShards]; - for (int j = 0; j < shards.length; j++) { - shards[j] = - new Shard(versionNumber, - new Path(indexPath, NUMBER_FORMAT.format(j)).toString(), - generation); - } - - if (fs.exists(outputPath)) { - fs.delete(outputPath, true); - } - - IIndexUpdater updater = new IndexUpdater(); - updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks, - shards); - - if (fs.exists(outputPath)) { - fs.delete(outputPath, true); - } - - // delete docs w/ even docids, update docs w/ odd docids - updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks, - shards); - - verify(shards); - } - - private void verify(Shard[] shards) throws IOException { - // verify the index - IndexReader[] readers = new IndexReader[shards.length]; - for (int i = 0; i < shards.length; i++) { - Directory dir = - new FileSystemDirectory(fs, new Path(shards[i].getDirectory()), - false, conf); - readers[i] = IndexReader.open(dir); - } - - IndexReader reader = new MultiReader(readers); - IndexSearcher searcher = new IndexSearcher(reader); - Hits hits = searcher.search(new TermQuery(new Term("content", "apache"))); - assertEquals(0, hits.length()); - - hits = searcher.search(new TermQuery(new Term("content", "hadoop"))); - assertEquals(numDocsPerRun / 2, hits.length()); - - int[] counts = new int[numDocsPerRun]; - for (int i = 0; i < hits.length(); i++) { - Document doc = hits.doc(i); - counts[Integer.parseInt(doc.get("id"))]++; - } - - for (int i = 0; i < numDocsPerRun; i++) { - if (i % 2 == 0) { - assertEquals(0, counts[i]); - } else { - assertEquals(1, counts[i]); - } - } - - searcher.close(); - reader.close(); - } - -} +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.MultiReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.Hits; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; + +import junit.framework.TestCase; + +public class TestDistributionPolicy extends TestCase { + + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + + // however, "we only allow 0 or 1 reducer in local mode" - from + // LocalJobRunner + private Configuration conf; + private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt"); + private Path localUpdatePath = + new Path(System.getProperty("build.test") + "/sample/data2.txt"); + private Path inputPath = new Path("/myexample/data.txt"); + private Path updatePath = new Path("/myexample/data2.txt"); + private Path outputPath = new Path("/myoutput"); + private Path indexPath = new Path("/myindex"); + private int numShards = 3; + private int numMapTasks = 5; + + private int numDataNodes = 3; + private int numTaskTrackers = 3; + + private int numDocsPerRun = 10; // num of docs in local input path + + private FileSystem fs; + private MiniDFSCluster dfsCluster; + private MiniMRCluster mrCluster; + + public TestDistributionPolicy() throws IOException { + super(); + if (System.getProperty("hadoop.log.dir") == null) { + String base = new File(".").getPath(); // getAbsolutePath(); + System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs"); + } + conf = new Configuration(); + } + + protected void setUp() throws Exception { + super.setUp(); + try { + dfsCluster = + new MiniDFSCluster(conf, numDataNodes, true, (String[]) null); + + fs = dfsCluster.getFileSystem(); + if (fs.exists(inputPath)) { + fs.delete(inputPath, true); + } + fs.copyFromLocalFile(localInputPath, inputPath); + if (fs.exists(updatePath)) { + fs.delete(updatePath, true); + } + fs.copyFromLocalFile(localUpdatePath, updatePath); + + if (fs.exists(outputPath)) { + // do not create, mapred will create + fs.delete(outputPath, true); + } + + if (fs.exists(indexPath)) { + fs.delete(indexPath, true); + } + + mrCluster = + new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1); + + } catch (IOException e) { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (mrCluster != null) { + mrCluster.shutdown(); + mrCluster = null; + } + + throw e; + } + + } + + protected void tearDown() throws Exception { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (mrCluster != null) { + mrCluster.shutdown(); + mrCluster = null; + } + + super.tearDown(); + } + + public void testDistributionPolicy() throws IOException { + IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf); + + // test hashing distribution policy + iconf.setDistributionPolicyClass(HashingDistributionPolicy.class); + onetest(); + + if (fs.exists(indexPath)) { + fs.delete(indexPath, true); + } + + // test round-robin distribution policy + iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class); + onetest(); + } + + private void onetest() throws IOException { + long versionNumber = -1; + long generation = -1; + + Shard[] shards = new Shard[numShards]; + for (int j = 0; j < shards.length; j++) { + shards[j] = + new Shard(versionNumber, + new Path(indexPath, NUMBER_FORMAT.format(j)).toString(), + generation); + } + + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + IIndexUpdater updater = new IndexUpdater(); + updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks, + shards); + + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + + // delete docs w/ even docids, update docs w/ odd docids + updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks, + shards); + + verify(shards); + } + + private void verify(Shard[] shards) throws IOException { + // verify the index + IndexReader[] readers = new IndexReader[shards.length]; + for (int i = 0; i < shards.length; i++) { + Directory dir = + new FileSystemDirectory(fs, new Path(shards[i].getDirectory()), + false, conf); + readers[i] = IndexReader.open(dir); + } + + IndexReader reader = new MultiReader(readers); + IndexSearcher searcher = new IndexSearcher(reader); + Hits hits = searcher.search(new TermQuery(new Term("content", "apache"))); + assertEquals(0, hits.length()); + + hits = searcher.search(new TermQuery(new Term("content", "hadoop"))); + assertEquals(numDocsPerRun / 2, hits.length()); + + int[] counts = new int[numDocsPerRun]; + for (int i = 0; i < hits.length(); i++) { + Document doc = hits.doc(i); + counts[Integer.parseInt(doc.get("id"))]++; + } + + for (int i = 0; i < numDocsPerRun; i++) { + if (i % 2 == 0) { + assertEquals(0, counts[i]); + } else { + assertEquals(1, counts[i]); + } + } + + searcher.close(); + reader.close(); + } + +}