From core-commits-return-4343-apmail-hadoop-core-commits-archive=hadoop.apache.org@hadoop.apache.org Thu Mar 20 03:34:01 2008 Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 32619 invoked from network); 20 Mar 2008 03:34:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Mar 2008 03:34:00 -0000 Received: (qmail 90702 invoked by uid 500); 20 Mar 2008 03:33:57 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 90681 invoked by uid 500); 20 Mar 2008 03:33:56 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 90665 invoked by uid 99); 20 Mar 2008 03:33:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Mar 2008 20:33:56 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Mar 2008 03:33:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D998F1A9842; Wed, 19 Mar 2008 20:33:33 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r639138 [3/3] - in /hadoop/core/trunk: ./ src/contrib/index/ src/contrib/index/conf/ src/contrib/index/lib/ src/contrib/index/sample/ src/contrib/index/src/ src/contrib/index/src/java/ src/contrib/index/src/java/org/ src/contrib/index/src/j... Date: Thu, 20 Mar 2008 03:33:24 -0000 To: core-commits@hadoop.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080320033333.D998F1A9842@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java?rev=639138&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java (added) +++ hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java Wed Mar 19 20:33:18 2008 @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.File; +import java.io.IOException; +import java.text.NumberFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy; +import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy; +import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.MultiReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.Hits; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; + +import junit.framework.TestCase; + +public class TestDistributionPolicy extends TestCase { + + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + + // however, "we only allow 0 or 1 reducer in local mode" - from + // LocalJobRunner + private Configuration conf; + private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt"); + private Path localUpdatePath = + new Path(System.getProperty("build.test") + "/sample/data2.txt"); + private Path inputPath = new Path("/myexample/data.txt"); + private Path updatePath = new Path("/myexample/data2.txt"); + private Path outputPath = new Path("/myoutput"); + private Path indexPath = new Path("/myindex"); + private int numShards = 3; + private int numMapTasks = 5; + + private int numDataNodes = 3; + private int numTaskTrackers = 3; + + private int numDocsPerRun = 10; // num of docs in local input path + + private FileSystem fs; + private MiniDFSCluster dfsCluster; + private MiniMRCluster mrCluster; + + public TestDistributionPolicy() throws IOException { + super(); + if (System.getProperty("hadoop.log.dir") == null) { + String base = new File(".").getPath(); // getAbsolutePath(); + System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs"); + } + conf = new Configuration(); + } + + protected void setUp() throws Exception { + super.setUp(); + try { + dfsCluster = + new MiniDFSCluster(conf, numDataNodes, true, (String[]) null); + + fs = dfsCluster.getFileSystem(); + if (fs.exists(inputPath)) { + fs.delete(inputPath); + } + fs.copyFromLocalFile(localInputPath, inputPath); + if (fs.exists(updatePath)) { + fs.delete(updatePath); + } + fs.copyFromLocalFile(localUpdatePath, updatePath); + + if (fs.exists(outputPath)) { + // do not create, mapred will create + fs.delete(outputPath); + } + + if (fs.exists(indexPath)) { + fs.delete(indexPath); + } + + mrCluster = + new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1); + + } catch (IOException e) { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (mrCluster != null) { + mrCluster.shutdown(); + mrCluster = null; + } + + throw e; + } + + } + + protected void tearDown() throws Exception { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (mrCluster != null) { + mrCluster.shutdown(); + mrCluster = null; + } + + super.tearDown(); + } + + public void testDistributionPolicy() throws IOException { + IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf); + + // test hashing distribution policy + iconf.setDistributionPolicyClass(HashingDistributionPolicy.class); + onetest(); + + if (fs.exists(indexPath)) { + fs.delete(indexPath); + } + + // test round-robin distribution policy + iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class); + onetest(); + } + + private void onetest() throws IOException { + long versionNumber = -1; + long generation = -1; + + Shard[] shards = new Shard[numShards]; + for (int j = 0; j < shards.length; j++) { + shards[j] = + new Shard(versionNumber, + new Path(indexPath, NUMBER_FORMAT.format(j)).toString(), + generation); + } + + if (fs.exists(outputPath)) { + fs.delete(outputPath); + } + + IIndexUpdater updater = new IndexUpdater(); + updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks, + shards); + + if (fs.exists(outputPath)) { + fs.delete(outputPath); + } + + // delete docs w/ even docids, update docs w/ odd docids + updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks, + shards); + + verify(shards); + } + + private void verify(Shard[] shards) throws IOException { + // verify the index + IndexReader[] readers = new IndexReader[shards.length]; + for (int i = 0; i < shards.length; i++) { + Directory dir = + new FileSystemDirectory(fs, new Path(shards[i].getDirectory()), + false, conf); + readers[i] = IndexReader.open(dir); + } + + IndexReader reader = new MultiReader(readers); + IndexSearcher searcher = new IndexSearcher(reader); + Hits hits = searcher.search(new TermQuery(new Term("content", "apache"))); + assertEquals(0, hits.length()); + + hits = searcher.search(new TermQuery(new Term("content", "hadoop"))); + assertEquals(numDocsPerRun / 2, hits.length()); + + int[] counts = new int[numDocsPerRun]; + for (int i = 0; i < hits.length(); i++) { + Document doc = hits.doc(i); + counts[Integer.parseInt(doc.get("id"))]++; + } + + for (int i = 0; i < numDocsPerRun; i++) { + if (i % 2 == 0) { + assertEquals(0, counts[i]); + } else { + assertEquals(1, counts[i]); + } + } + + searcher.close(); + reader.close(); + } + +} Propchange: hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java ------------------------------------------------------------------------------ svn:executable = * Added: hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java?rev=639138&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java (added) +++ hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java Wed Mar 19 20:33:18 2008 @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.contrib.index.mapred; + +import java.io.File; +import java.io.IOException; +import java.text.NumberFormat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.MultiReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.Hits; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; + +import junit.framework.TestCase; + +public class TestIndexUpdater extends TestCase { + + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + + // however, "we only allow 0 or 1 reducer in local mode" - from + // LocalJobRunner + private Configuration conf; + private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt"); + private Path inputPath = new Path("/myexample/data.txt"); + private Path outputPath = new Path("/myoutput"); + private Path indexPath = new Path("/myindex"); + private int initNumShards = 3; + private int numMapTasks = 5; + + private int numDataNodes = 3; + private int numTaskTrackers = 3; + + private int numRuns = 3; + private int numDocsPerRun = 10; // num of docs in local input path + + private FileSystem fs; + private MiniDFSCluster dfsCluster; + private MiniMRCluster mrCluster; + + public TestIndexUpdater() throws IOException { + super(); + if (System.getProperty("hadoop.log.dir") == null) { + String base = new File(".").getPath(); // getAbsolutePath(); + System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs"); + } + conf = new Configuration(); + } + + protected void setUp() throws Exception { + super.setUp(); + try { + dfsCluster = + new MiniDFSCluster(conf, numDataNodes, true, (String[]) null); + + fs = dfsCluster.getFileSystem(); + if (fs.exists(inputPath)) { + fs.delete(inputPath); + } + fs.copyFromLocalFile(localInputPath, inputPath); + + if (fs.exists(outputPath)) { + // do not create, mapred will create + fs.delete(outputPath); + } + + if (fs.exists(indexPath)) { + fs.delete(indexPath); + } + + mrCluster = + new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1); + + } catch (IOException e) { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (mrCluster != null) { + mrCluster.shutdown(); + mrCluster = null; + } + + throw e; + } + + } + + protected void tearDown() throws Exception { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + + if (fs != null) { + fs.close(); + fs = null; + } + + if (mrCluster != null) { + mrCluster.shutdown(); + mrCluster = null; + } + + super.tearDown(); + } + + public void testIndexUpdater() throws IOException { + IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf); + // max field length, compound file and number of segments will be checked + // later + iconf.setIndexMaxFieldLength(2); + iconf.setIndexUseCompoundFile(true); + iconf.setIndexMaxNumSegments(1); + + long versionNumber = -1; + long generation = -1; + + for (int i = 0; i < numRuns; i++) { + if (fs.exists(outputPath)) { + fs.delete(outputPath); + } + + Shard[] shards = new Shard[initNumShards + i]; + for (int j = 0; j < shards.length; j++) { + shards[j] = + new Shard(versionNumber, new Path(indexPath, + NUMBER_FORMAT.format(j)).toString(), generation); + } + run(i + 1, shards); + } + } + + private void run(int numRuns, Shard[] shards) throws IOException { + IIndexUpdater updater = new IndexUpdater(); + updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks, + shards); + + // verify the done files + Path[] doneFileNames = new Path[shards.length]; + int count = 0; + FileStatus[] fileStatus = fs.listStatus(outputPath); + for (int i = 0; i < fileStatus.length; i++) { + FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath()); + for (int j = 0; j < doneFiles.length; j++) { + doneFileNames[count++] = doneFiles[j].getPath(); + } + } + assertEquals(shards.length, count); + for (int i = 0; i < count; i++) { + assertTrue(doneFileNames[i].getName().startsWith( + IndexUpdateReducer.DONE.toString())); + } + + // verify the index + IndexReader[] readers = new IndexReader[shards.length]; + for (int i = 0; i < shards.length; i++) { + Directory dir = + new FileSystemDirectory(fs, new Path(shards[i].getDirectory()), + false, conf); + readers[i] = IndexReader.open(dir); + } + + IndexReader reader = new MultiReader(readers); + IndexSearcher searcher = new IndexSearcher(reader); + Hits hits = searcher.search(new TermQuery(new Term("content", "apache"))); + + assertEquals(numRuns * numDocsPerRun, hits.length()); + + int[] counts = new int[numDocsPerRun]; + for (int i = 0; i < hits.length(); i++) { + Document doc = hits.doc(i); + counts[Integer.parseInt(doc.get("id"))]++; + } + + for (int i = 0; i < numDocsPerRun; i++) { + assertEquals(numRuns, counts[i]); + } + + // max field length is 2, so "dot" is also indexed but not "org" + hits = searcher.search(new TermQuery(new Term("content", "dot"))); + assertEquals(numRuns, hits.length()); + + hits = searcher.search(new TermQuery(new Term("content", "org"))); + assertEquals(0, hits.length()); + + searcher.close(); + reader.close(); + + // open and close an index writer with KeepOnlyLastCommitDeletionPolicy + // to remove earlier checkpoints + for (int i = 0; i < shards.length; i++) { + Directory dir = + new FileSystemDirectory(fs, new Path(shards[i].getDirectory()), + false, conf); + IndexWriter writer = + new IndexWriter(dir, false, null, + new KeepOnlyLastCommitDeletionPolicy()); + writer.close(); + } + + // verify the number of segments, must be done after an writer with + // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed + for (int i = 0; i < shards.length; i++) { + PathFilter cfsFilter = new PathFilter() { + public boolean accept(Path path) { + return path.getName().endsWith(".cfs"); + } + }; + FileStatus[] cfsFiles = + fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter); + assertEquals(1, cfsFiles.length); + } + } + +} Propchange: hadoop/core/trunk/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java ------------------------------------------------------------------------------ svn:executable = *