hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1401071 [7/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Date Mon, 22 Oct 2012 20:43:30 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java Mon Oct 22 20:43:16 2012
@@ -1,252 +1,252 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
-
-/**
- * An intermediate form for one or more parsed Lucene documents and/or
- * delete terms. It actually uses Lucene file format as the format for
- * the intermediate form by using RAM dir files.
- * 
- * Note: If process(*) is ever called, closeWriter() should be called.
- * Otherwise, no need to call closeWriter().
- */
-public class IntermediateForm implements Writable {
-
-  private IndexUpdateConfiguration iconf = null;
-  private final Collection<Term> deleteList;
-  private RAMDirectory dir;
-  private IndexWriter writer;
-  private int numDocs;
-
-  /**
-   * Constructor
-   * @throws IOException
-   */
-  public IntermediateForm() throws IOException {
-    deleteList = new ConcurrentLinkedQueue<Term>();
-    dir = new RAMDirectory();
-    writer = null;
-    numDocs = 0;
-  }
-
-  /**
-   * Configure using an index update configuration.
-   * @param iconf  the index update configuration
-   */
-  public void configure(IndexUpdateConfiguration iconf) {
-    this.iconf = iconf;
-  }
-
-  /**
-   * Get the ram directory of the intermediate form.
-   * @return the ram directory
-   */
-  public Directory getDirectory() {
-    return dir;
-  }
-
-  /**
-   * Get an iterator for the delete terms in the intermediate form.
-   * @return an iterator for the delete terms
-   */
-  public Iterator<Term> deleteTermIterator() {
-    return deleteList.iterator();
-  }
-
-  /**
-   * This method is used by the index update mapper and process a document
-   * operation into the current intermediate form.
-   * @param doc  input document operation
-   * @param analyzer  the analyzer
-   * @throws IOException
-   */
-  public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
-    if (doc.getOp() == DocumentAndOp.Op.DELETE
-        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
-      deleteList.add(doc.getTerm());
-
-    }
-
-    if (doc.getOp() == DocumentAndOp.Op.INSERT
-        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
-
-      if (writer == null) {
-        // analyzer is null because we specify an analyzer with addDocument
-        writer = createWriter();
-      }
-
-      writer.addDocument(doc.getDocument(), analyzer);
-      numDocs++;
-    }
-
-  }
-
-  /**
-   * This method is used by the index update combiner and process an
-   * intermediate form into the current intermediate form. More specifically,
-   * the input intermediate forms are a single-document ram index and/or a
-   * single delete term.
-   * @param form  the input intermediate form
-   * @throws IOException
-   */
-  public void process(IntermediateForm form) throws IOException {
-    if (form.deleteList.size() > 0) {
-      deleteList.addAll(form.deleteList);
-    }
-
-    if (form.dir.sizeInBytes() > 0) {
-      if (writer == null) {
-        writer = createWriter();
-      }
-
-      writer.addIndexesNoOptimize(new Directory[] { form.dir });
-      numDocs++;
-    }
-
-  }
-
-  /**
-   * Close the Lucene index writer associated with the intermediate form,
-   * if created. Do not close the ram directory. In fact, there is no need
-   * to close a ram directory.
-   * @throws IOException
-   */
-  public void closeWriter() throws IOException {
-    if (writer != null) {
-      writer.close();
-      writer = null;
-    }
-  }
-
-  /**
-   * The total size of files in the directory and ram used by the index writer.
-   * It does not include memory used by the delete list.
-   * @return the total size in bytes
-   */
-  public long totalSizeInBytes() throws IOException {
-    long size = dir.sizeInBytes();
-    if (writer != null) {
-      size += writer.ramSizeInBytes();
-    }
-    return size;
-  }
-
-  /* (non-Javadoc)
-   * @see java.lang.Object#toString()
-   */
-  public String toString() {
-    StringBuilder buffer = new StringBuilder();
-    buffer.append(this.getClass().getSimpleName());
-    buffer.append("[numDocs=");
-    buffer.append(numDocs);
-    buffer.append(", numDeletes=");
-    buffer.append(deleteList.size());
-    if (deleteList.size() > 0) {
-      buffer.append("(");
-      Iterator<Term> iter = deleteTermIterator();
-      while (iter.hasNext()) {
-        buffer.append(iter.next());
-        buffer.append(" ");
-      }
-      buffer.append(")");
-    }
-    buffer.append("]");
-    return buffer.toString();
-  }
-
-  private IndexWriter createWriter() throws IOException {
-    IndexWriter writer =
-        new IndexWriter(dir, false, null,
-            new KeepOnlyLastCommitDeletionPolicy());
-    writer.setUseCompoundFile(false);
-
-    if (iconf != null) {
-      int maxFieldLength = iconf.getIndexMaxFieldLength();
-      if (maxFieldLength > 0) {
-        writer.setMaxFieldLength(maxFieldLength);
-      }
-    }
-
-    return writer;
-  }
-
-  private void resetForm() throws IOException {
-    deleteList.clear();
-    if (dir.sizeInBytes() > 0) {
-      // it's ok if we don't close a ram directory
-      dir.close();
-      // an alternative is to delete all the files and reuse the ram directory
-      dir = new RAMDirectory();
-    }
-    assert (writer == null);
-    numDocs = 0;
-  }
-
-  // ///////////////////////////////////
-  // Writable
-  // ///////////////////////////////////
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
-   */
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(deleteList.size());
-    for (Term term : deleteList) {
-      Text.writeString(out, term.field());
-      Text.writeString(out, term.text());
-    }
-
-    String[] files = dir.list();
-    RAMDirectoryUtil.writeRAMFiles(out, dir, files);
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
-   */
-  public void readFields(DataInput in) throws IOException {
-    resetForm();
-
-    int numDeleteTerms = in.readInt();
-    for (int i = 0; i < numDeleteTerms; i++) {
-      String field = Text.readString(in);
-      String text = Text.readString(in);
-      deleteList.add(new Term(field, text));
-    }
-
-    RAMDirectoryUtil.readRAMFiles(in, dir);
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+
+/**
+ * An intermediate form for one or more parsed Lucene documents and/or
+ * delete terms. It actually uses Lucene file format as the format for
+ * the intermediate form by using RAM dir files.
+ * 
+ * Note: If process(*) is ever called, closeWriter() should be called.
+ * Otherwise, no need to call closeWriter().
+ */
+public class IntermediateForm implements Writable {
+
+  private IndexUpdateConfiguration iconf = null;
+  private final Collection<Term> deleteList;
+  private RAMDirectory dir;
+  private IndexWriter writer;
+  private int numDocs;
+
+  /**
+   * Constructor
+   * @throws IOException
+   */
+  public IntermediateForm() throws IOException {
+    deleteList = new ConcurrentLinkedQueue<Term>();
+    dir = new RAMDirectory();
+    writer = null;
+    numDocs = 0;
+  }
+
+  /**
+   * Configure using an index update configuration.
+   * @param iconf  the index update configuration
+   */
+  public void configure(IndexUpdateConfiguration iconf) {
+    this.iconf = iconf;
+  }
+
+  /**
+   * Get the ram directory of the intermediate form.
+   * @return the ram directory
+   */
+  public Directory getDirectory() {
+    return dir;
+  }
+
+  /**
+   * Get an iterator for the delete terms in the intermediate form.
+   * @return an iterator for the delete terms
+   */
+  public Iterator<Term> deleteTermIterator() {
+    return deleteList.iterator();
+  }
+
+  /**
+   * This method is used by the index update mapper and process a document
+   * operation into the current intermediate form.
+   * @param doc  input document operation
+   * @param analyzer  the analyzer
+   * @throws IOException
+   */
+  public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
+    if (doc.getOp() == DocumentAndOp.Op.DELETE
+        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+      deleteList.add(doc.getTerm());
+
+    }
+
+    if (doc.getOp() == DocumentAndOp.Op.INSERT
+        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+
+      if (writer == null) {
+        // analyzer is null because we specify an analyzer with addDocument
+        writer = createWriter();
+      }
+
+      writer.addDocument(doc.getDocument(), analyzer);
+      numDocs++;
+    }
+
+  }
+
+  /**
+   * This method is used by the index update combiner and process an
+   * intermediate form into the current intermediate form. More specifically,
+   * the input intermediate forms are a single-document ram index and/or a
+   * single delete term.
+   * @param form  the input intermediate form
+   * @throws IOException
+   */
+  public void process(IntermediateForm form) throws IOException {
+    if (form.deleteList.size() > 0) {
+      deleteList.addAll(form.deleteList);
+    }
+
+    if (form.dir.sizeInBytes() > 0) {
+      if (writer == null) {
+        writer = createWriter();
+      }
+
+      writer.addIndexesNoOptimize(new Directory[] { form.dir });
+      numDocs++;
+    }
+
+  }
+
+  /**
+   * Close the Lucene index writer associated with the intermediate form,
+   * if created. Do not close the ram directory. In fact, there is no need
+   * to close a ram directory.
+   * @throws IOException
+   */
+  public void closeWriter() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+  }
+
+  /**
+   * The total size of files in the directory and ram used by the index writer.
+   * It does not include memory used by the delete list.
+   * @return the total size in bytes
+   */
+  public long totalSizeInBytes() throws IOException {
+    long size = dir.sizeInBytes();
+    if (writer != null) {
+      size += writer.ramSizeInBytes();
+    }
+    return size;
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#toString()
+   */
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append(this.getClass().getSimpleName());
+    buffer.append("[numDocs=");
+    buffer.append(numDocs);
+    buffer.append(", numDeletes=");
+    buffer.append(deleteList.size());
+    if (deleteList.size() > 0) {
+      buffer.append("(");
+      Iterator<Term> iter = deleteTermIterator();
+      while (iter.hasNext()) {
+        buffer.append(iter.next());
+        buffer.append(" ");
+      }
+      buffer.append(")");
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  private IndexWriter createWriter() throws IOException {
+    IndexWriter writer =
+        new IndexWriter(dir, false, null,
+            new KeepOnlyLastCommitDeletionPolicy());
+    writer.setUseCompoundFile(false);
+
+    if (iconf != null) {
+      int maxFieldLength = iconf.getIndexMaxFieldLength();
+      if (maxFieldLength > 0) {
+        writer.setMaxFieldLength(maxFieldLength);
+      }
+    }
+
+    return writer;
+  }
+
+  private void resetForm() throws IOException {
+    deleteList.clear();
+    if (dir.sizeInBytes() > 0) {
+      // it's ok if we don't close a ram directory
+      dir.close();
+      // an alternative is to delete all the files and reuse the ram directory
+      dir = new RAMDirectory();
+    }
+    assert (writer == null);
+    numDocs = 0;
+  }
+
+  // ///////////////////////////////////
+  // Writable
+  // ///////////////////////////////////
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(deleteList.size());
+    for (Term term : deleteList) {
+      Text.writeString(out, term.field());
+      Text.writeString(out, term.text());
+    }
+
+    String[] files = dir.list();
+    RAMDirectoryUtil.writeRAMFiles(out, dir, files);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  public void readFields(DataInput in) throws IOException {
+    resetForm();
+
+    int numDeleteTerms = in.readInt();
+    for (int i = 0; i < numDeleteTerms; i++) {
+      String field = Text.readString(in);
+      String text = Text.readString(in);
+      deleteList.add(new Term(field, text));
+    }
+
+    RAMDirectoryUtil.readRAMFiles(in, dir);
+  }
+
+}

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java Mon Oct 22 20:43:16 2012
@@ -1,105 +1,105 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.lucene;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.RAMDirectory;
-
-public class TestMixedDirectory extends TestCase {
-  private int numDocsPerUpdate = 10;
-  private int maxBufferedDocs = 2;
-
-  public void testMixedDirectoryAndPolicy() throws IOException {
-    Directory readDir = new RAMDirectory();
-    updateIndex(readDir, 0, numDocsPerUpdate,
-        new KeepOnlyLastCommitDeletionPolicy());
-
-    verify(readDir, numDocsPerUpdate);
-
-    IndexOutput out =
-        readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
-            + ".cfs");
-    out.writeInt(0);
-    out.close();
-
-    Directory writeDir = new RAMDirectory();
-    Directory mixedDir = new MixedDirectory(readDir, writeDir);
-    updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
-        new MixedDeletionPolicy());
-
-    verify(readDir, numDocsPerUpdate);
-    verify(mixedDir, 2 * numDocsPerUpdate);
-  }
-
-  public void updateIndex(Directory dir, int base, int numDocs,
-      IndexDeletionPolicy policy) throws IOException {
-    IndexWriter writer =
-        new IndexWriter(dir, false, new StandardAnalyzer(), policy);
-    writer.setMaxBufferedDocs(maxBufferedDocs);
-    writer.setMergeFactor(1000);
-    for (int i = 0; i < numDocs; i++) {
-      addDoc(writer, base + i);
-    }
-    writer.close();
-  }
-
-  private void addDoc(IndexWriter writer, int id) throws IOException {
-    Document doc = new Document();
-    doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
-        Field.Index.UN_TOKENIZED));
-    doc.add(new Field("content", "apache", Field.Store.NO,
-        Field.Index.TOKENIZED));
-    writer.addDocument(doc);
-  }
-
-  private void verify(Directory dir, int expectedHits) throws IOException {
-    IndexSearcher searcher = new IndexSearcher(dir);
-    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-    int numHits = hits.length();
-
-    assertEquals(expectedHits, numHits);
-
-    int[] docs = new int[numHits];
-    for (int i = 0; i < numHits; i++) {
-      Document hit = hits.doc(i);
-      docs[Integer.parseInt(hit.get("id"))]++;
-    }
-    for (int i = 0; i < numHits; i++) {
-      assertEquals(1, docs[i]);
-    }
-
-    searcher.close();
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.lucene;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+
+public class TestMixedDirectory extends TestCase {
+  private int numDocsPerUpdate = 10;
+  private int maxBufferedDocs = 2;
+
+  public void testMixedDirectoryAndPolicy() throws IOException {
+    Directory readDir = new RAMDirectory();
+    updateIndex(readDir, 0, numDocsPerUpdate,
+        new KeepOnlyLastCommitDeletionPolicy());
+
+    verify(readDir, numDocsPerUpdate);
+
+    IndexOutput out =
+        readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
+            + ".cfs");
+    out.writeInt(0);
+    out.close();
+
+    Directory writeDir = new RAMDirectory();
+    Directory mixedDir = new MixedDirectory(readDir, writeDir);
+    updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
+        new MixedDeletionPolicy());
+
+    verify(readDir, numDocsPerUpdate);
+    verify(mixedDir, 2 * numDocsPerUpdate);
+  }
+
+  public void updateIndex(Directory dir, int base, int numDocs,
+      IndexDeletionPolicy policy) throws IOException {
+    IndexWriter writer =
+        new IndexWriter(dir, false, new StandardAnalyzer(), policy);
+    writer.setMaxBufferedDocs(maxBufferedDocs);
+    writer.setMergeFactor(1000);
+    for (int i = 0; i < numDocs; i++) {
+      addDoc(writer, base + i);
+    }
+    writer.close();
+  }
+
+  private void addDoc(IndexWriter writer, int id) throws IOException {
+    Document doc = new Document();
+    doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
+        Field.Index.UN_TOKENIZED));
+    doc.add(new Field("content", "apache", Field.Store.NO,
+        Field.Index.TOKENIZED));
+    writer.addDocument(doc);
+  }
+
+  private void verify(Directory dir, int expectedHits) throws IOException {
+    IndexSearcher searcher = new IndexSearcher(dir);
+    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+    int numHits = hits.length();
+
+    assertEquals(expectedHits, numHits);
+
+    int[] docs = new int[numHits];
+    for (int i = 0; i < numHits; i++) {
+      Document hit = hits.doc(i);
+      docs[Integer.parseInt(hit.get("id"))]++;
+    }
+    for (int i = 0; i < numHits; i++) {
+      assertEquals(1, docs[i]);
+    }
+
+    searcher.close();
+  }
+
+}

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java Mon Oct 22 20:43:16 2012
@@ -1,234 +1,234 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
-import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
+import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestDistributionPolicy extends TestCase {
-
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  // however, "we only allow 0 or 1 reducer in local mode" - from
-  // LocalJobRunner
-  private Configuration conf;
-  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
-  private Path localUpdatePath =
-      new Path(System.getProperty("build.test") + "/sample/data2.txt");
-  private Path inputPath = new Path("/myexample/data.txt");
-  private Path updatePath = new Path("/myexample/data2.txt");
-  private Path outputPath = new Path("/myoutput");
-  private Path indexPath = new Path("/myindex");
-  private int numShards = 3;
-  private int numMapTasks = 5;
-
-  private int numDataNodes = 3;
-  private int numTaskTrackers = 3;
-
-  private int numDocsPerRun = 10; // num of docs in local input path
-
-  private FileSystem fs;
-  private MiniDFSCluster dfsCluster;
-  private MiniMRCluster mrCluster;
-
-  public TestDistributionPolicy() throws IOException {
-    super();
-    if (System.getProperty("hadoop.log.dir") == null) {
-      String base = new File(".").getPath(); // getAbsolutePath();
-      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
-    }
-    conf = new Configuration();
-  }
-
-  protected void setUp() throws Exception {
-    super.setUp();
-    try {
-      dfsCluster =
-          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
-      fs = dfsCluster.getFileSystem();
-      if (fs.exists(inputPath)) {
-        fs.delete(inputPath, true);
-      }
-      fs.copyFromLocalFile(localInputPath, inputPath);
-      if (fs.exists(updatePath)) {
-        fs.delete(updatePath, true);
-      }
-      fs.copyFromLocalFile(localUpdatePath, updatePath);
-
-      if (fs.exists(outputPath)) {
-        // do not create, mapred will create
-        fs.delete(outputPath, true);
-      }
-
-      if (fs.exists(indexPath)) {
-        fs.delete(indexPath, true);
-      }
-
-      mrCluster =
-          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
-    } catch (IOException e) {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-        dfsCluster = null;
-      }
-
-      if (fs != null) {
-        fs.close();
-        fs = null;
-      }
-
-      if (mrCluster != null) {
-        mrCluster.shutdown();
-        mrCluster = null;
-      }
-
-      throw e;
-    }
-
-  }
-
-  protected void tearDown() throws Exception {
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-
-    if (fs != null) {
-      fs.close();
-      fs = null;
-    }
-
-    if (mrCluster != null) {
-      mrCluster.shutdown();
-      mrCluster = null;
-    }
-
-    super.tearDown();
-  }
-
-  public void testDistributionPolicy() throws IOException {
-    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
-
-    // test hashing distribution policy
-    iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
-    onetest();
-
-    if (fs.exists(indexPath)) {
-      fs.delete(indexPath, true);
-    }
-
-    // test round-robin distribution policy
-    iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
-    onetest();
-  }
-
-  private void onetest() throws IOException {
-    long versionNumber = -1;
-    long generation = -1;
-
-    Shard[] shards = new Shard[numShards];
-    for (int j = 0; j < shards.length; j++) {
-      shards[j] =
-          new Shard(versionNumber,
-              new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
-              generation);
-    }
-
-    if (fs.exists(outputPath)) {
-      fs.delete(outputPath, true);
-    }
-
-    IIndexUpdater updater = new IndexUpdater();
-    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
-        shards);
-
-    if (fs.exists(outputPath)) {
-      fs.delete(outputPath, true);
-    }
-
-    // delete docs w/ even docids, update docs w/ odd docids
-    updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
-        shards);
-
-    verify(shards);
-  }
-
-  private void verify(Shard[] shards) throws IOException {
-    // verify the index
-    IndexReader[] readers = new IndexReader[shards.length];
-    for (int i = 0; i < shards.length; i++) {
-      Directory dir =
-          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
-              false, conf);
-      readers[i] = IndexReader.open(dir);
-    }
-
-    IndexReader reader = new MultiReader(readers);
-    IndexSearcher searcher = new IndexSearcher(reader);
-    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-    assertEquals(0, hits.length());
-
-    hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
-    assertEquals(numDocsPerRun / 2, hits.length());
-
-    int[] counts = new int[numDocsPerRun];
-    for (int i = 0; i < hits.length(); i++) {
-      Document doc = hits.doc(i);
-      counts[Integer.parseInt(doc.get("id"))]++;
-    }
-
-    for (int i = 0; i < numDocsPerRun; i++) {
-      if (i % 2 == 0) {
-        assertEquals(0, counts[i]);
-      } else {
-        assertEquals(1, counts[i]);
-      }
-    }
-
-    searcher.close();
-    reader.close();
-  }
-
-}
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestDistributionPolicy extends TestCase {
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  // however, "we only allow 0 or 1 reducer in local mode" - from
+  // LocalJobRunner
+  private Configuration conf;
+  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+  private Path localUpdatePath =
+      new Path(System.getProperty("build.test") + "/sample/data2.txt");
+  private Path inputPath = new Path("/myexample/data.txt");
+  private Path updatePath = new Path("/myexample/data2.txt");
+  private Path outputPath = new Path("/myoutput");
+  private Path indexPath = new Path("/myindex");
+  private int numShards = 3;
+  private int numMapTasks = 5;
+
+  private int numDataNodes = 3;
+  private int numTaskTrackers = 3;
+
+  private int numDocsPerRun = 10; // num of docs in local input path
+
+  private FileSystem fs;
+  private MiniDFSCluster dfsCluster;
+  private MiniMRCluster mrCluster;
+
+  public TestDistributionPolicy() throws IOException {
+    super();
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+    }
+    conf = new Configuration();
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    try {
+      dfsCluster =
+          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+      fs = dfsCluster.getFileSystem();
+      if (fs.exists(inputPath)) {
+        fs.delete(inputPath, true);
+      }
+      fs.copyFromLocalFile(localInputPath, inputPath);
+      if (fs.exists(updatePath)) {
+        fs.delete(updatePath, true);
+      }
+      fs.copyFromLocalFile(localUpdatePath, updatePath);
+
+      if (fs.exists(outputPath)) {
+        // do not create, mapred will create
+        fs.delete(outputPath, true);
+      }
+
+      if (fs.exists(indexPath)) {
+        fs.delete(indexPath, true);
+      }
+
+      mrCluster =
+          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+    } catch (IOException e) {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+        dfsCluster = null;
+      }
+
+      if (fs != null) {
+        fs.close();
+        fs = null;
+      }
+
+      if (mrCluster != null) {
+        mrCluster.shutdown();
+        mrCluster = null;
+      }
+
+      throw e;
+    }
+
+  }
+
+  protected void tearDown() throws Exception {
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+      mrCluster = null;
+    }
+
+    super.tearDown();
+  }
+
+  public void testDistributionPolicy() throws IOException {
+    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+
+    // test hashing distribution policy
+    iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
+    onetest();
+
+    if (fs.exists(indexPath)) {
+      fs.delete(indexPath, true);
+    }
+
+    // test round-robin distribution policy
+    iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
+    onetest();
+  }
+
+  private void onetest() throws IOException {
+    long versionNumber = -1;
+    long generation = -1;
+
+    Shard[] shards = new Shard[numShards];
+    for (int j = 0; j < shards.length; j++) {
+      shards[j] =
+          new Shard(versionNumber,
+              new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
+              generation);
+    }
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    IIndexUpdater updater = new IndexUpdater();
+    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+        shards);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    // delete docs w/ even docids, update docs w/ odd docids
+    updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
+        shards);
+
+    verify(shards);
+  }
+
+  private void verify(Shard[] shards) throws IOException {
+    // verify the index
+    IndexReader[] readers = new IndexReader[shards.length];
+    for (int i = 0; i < shards.length; i++) {
+      Directory dir =
+          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+              false, conf);
+      readers[i] = IndexReader.open(dir);
+    }
+
+    IndexReader reader = new MultiReader(readers);
+    IndexSearcher searcher = new IndexSearcher(reader);
+    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+    assertEquals(0, hits.length());
+
+    hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
+    assertEquals(numDocsPerRun / 2, hits.length());
+
+    int[] counts = new int[numDocsPerRun];
+    for (int i = 0; i < hits.length(); i++) {
+      Document doc = hits.doc(i);
+      counts[Integer.parseInt(doc.get("id"))]++;
+    }
+
+    for (int i = 0; i < numDocsPerRun; i++) {
+      if (i % 2 == 0) {
+        assertEquals(0, counts[i]);
+      } else {
+        assertEquals(1, counts[i]);
+      }
+    }
+
+    searcher.close();
+    reader.close();
+  }
+
+}

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java?rev=1401071&r1=1401070&r2=1401071&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java Mon Oct 22 20:43:16 2012
@@ -1,258 +1,258 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestIndexUpdater extends TestCase {
-
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  // however, "we only allow 0 or 1 reducer in local mode" - from
-  // LocalJobRunner
-  private Configuration conf;
-  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
-  private Path inputPath = new Path("/myexample/data.txt");
-  private Path outputPath = new Path("/myoutput");
-  private Path indexPath = new Path("/myindex");
-  private int initNumShards = 3;
-  private int numMapTasks = 5;
-
-  private int numDataNodes = 3;
-  private int numTaskTrackers = 3;
-
-  private int numRuns = 3;
-  private int numDocsPerRun = 10; // num of docs in local input path
-
-  private FileSystem fs;
-  private MiniDFSCluster dfsCluster;
-  private MiniMRCluster mrCluster;
-
-  public TestIndexUpdater() throws IOException {
-    super();
-    if (System.getProperty("hadoop.log.dir") == null) {
-      String base = new File(".").getPath(); // getAbsolutePath();
-      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
-    }
-    conf = new Configuration();
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestIndexUpdater extends TestCase {
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  // however, "we only allow 0 or 1 reducer in local mode" - from
+  // LocalJobRunner
+  private Configuration conf;
+  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+  private Path inputPath = new Path("/myexample/data.txt");
+  private Path outputPath = new Path("/myoutput");
+  private Path indexPath = new Path("/myindex");
+  private int initNumShards = 3;
+  private int numMapTasks = 5;
+
+  private int numDataNodes = 3;
+  private int numTaskTrackers = 3;
+
+  private int numRuns = 3;
+  private int numDocsPerRun = 10; // num of docs in local input path
+
+  private FileSystem fs;
+  private MiniDFSCluster dfsCluster;
+  private MiniMRCluster mrCluster;
+
+  public TestIndexUpdater() throws IOException {
+    super();
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+    }
+    conf = new Configuration();
     //See MAPREDUCE-947 for more details. Setting to false prevents the creation of _SUCCESS.
     conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
-  }
-
-  protected void setUp() throws Exception {
-    super.setUp();
-    try {
-      dfsCluster =
-          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
-      fs = dfsCluster.getFileSystem();
-      if (fs.exists(inputPath)) {
-        fs.delete(inputPath, true);
-      }
-      fs.copyFromLocalFile(localInputPath, inputPath);
-
-      if (fs.exists(outputPath)) {
-        // do not create, mapred will create
-        fs.delete(outputPath, true);
-      }
-
-      if (fs.exists(indexPath)) {
-        fs.delete(indexPath, true);
-      }
-
-      mrCluster =
-          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
-    } catch (IOException e) {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-        dfsCluster = null;
-      }
-
-      if (fs != null) {
-        fs.close();
-        fs = null;
-      }
-
-      if (mrCluster != null) {
-        mrCluster.shutdown();
-        mrCluster = null;
-      }
-
-      throw e;
-    }
-
-  }
-
-  protected void tearDown() throws Exception {
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-
-    if (fs != null) {
-      fs.close();
-      fs = null;
-    }
-
-    if (mrCluster != null) {
-      mrCluster.shutdown();
-      mrCluster = null;
-    }
-
-    super.tearDown();
-  }
-
-  public void testIndexUpdater() throws IOException {
-    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
-    // max field length, compound file and number of segments will be checked
-    // later
-    iconf.setIndexMaxFieldLength(2);
-    iconf.setIndexUseCompoundFile(true);
-    iconf.setIndexMaxNumSegments(1);
-    iconf.setMaxRAMSizeInBytes(20480);
-
-    long versionNumber = -1;
-    long generation = -1;
-
-    for (int i = 0; i < numRuns; i++) {
-      if (fs.exists(outputPath)) {
-        fs.delete(outputPath, true);
-      }
-
-      Shard[] shards = new Shard[initNumShards + i];
-      for (int j = 0; j < shards.length; j++) {
-        shards[j] =
-            new Shard(versionNumber, new Path(indexPath,
-                NUMBER_FORMAT.format(j)).toString(), generation);
-      }
-      run(i + 1, shards);
-    }
-  }
-
-  private void run(int numRuns, Shard[] shards) throws IOException {
-    IIndexUpdater updater = new IndexUpdater();
-    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
-        shards);
-
-    // verify the done files
-    Path[] doneFileNames = new Path[shards.length];
-    int count = 0;
-    FileStatus[] fileStatus = fs.listStatus(outputPath);
-    for (int i = 0; i < fileStatus.length; i++) {
-      FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
-      for (int j = 0; j < doneFiles.length; j++) {
-        doneFileNames[count++] = doneFiles[j].getPath();
-      }
-    }
-    assertEquals(shards.length, count);
-    for (int i = 0; i < count; i++) {
-      assertTrue(doneFileNames[i].getName().startsWith(
-          IndexUpdateReducer.DONE.toString()));
-    }
-
-    // verify the index
-    IndexReader[] readers = new IndexReader[shards.length];
-    for (int i = 0; i < shards.length; i++) {
-      Directory dir =
-          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
-              false, conf);
-      readers[i] = IndexReader.open(dir);
-    }
-
-    IndexReader reader = new MultiReader(readers);
-    IndexSearcher searcher = new IndexSearcher(reader);
-    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-
-    assertEquals(numRuns * numDocsPerRun, hits.length());
-
-    int[] counts = new int[numDocsPerRun];
-    for (int i = 0; i < hits.length(); i++) {
-      Document doc = hits.doc(i);
-      counts[Integer.parseInt(doc.get("id"))]++;
-    }
-
-    for (int i = 0; i < numDocsPerRun; i++) {
-      assertEquals(numRuns, counts[i]);
-    }
-
-    // max field length is 2, so "dot" is also indexed but not "org"
-    hits = searcher.search(new TermQuery(new Term("content", "dot")));
-    assertEquals(numRuns, hits.length());
-
-    hits = searcher.search(new TermQuery(new Term("content", "org")));
-    assertEquals(0, hits.length());
-
-    searcher.close();
-    reader.close();
-
-    // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
-    // to remove earlier checkpoints
-    for (int i = 0; i < shards.length; i++) {
-      Directory dir =
-          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
-              false, conf);
-      IndexWriter writer =
-          new IndexWriter(dir, false, null,
-              new KeepOnlyLastCommitDeletionPolicy());
-      writer.close();
-    }
-
-    // verify the number of segments, must be done after an writer with
-    // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
-    for (int i = 0; i < shards.length; i++) {
-      PathFilter cfsFilter = new PathFilter() {
-        public boolean accept(Path path) {
-          return path.getName().endsWith(".cfs");
-        }
-      };
-      FileStatus[] cfsFiles =
-          fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
-      assertEquals(1, cfsFiles.length);
-    }
-  }
-
-}
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    try {
+      dfsCluster =
+          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+      fs = dfsCluster.getFileSystem();
+      if (fs.exists(inputPath)) {
+        fs.delete(inputPath, true);
+      }
+      fs.copyFromLocalFile(localInputPath, inputPath);
+
+      if (fs.exists(outputPath)) {
+        // do not create, mapred will create
+        fs.delete(outputPath, true);
+      }
+
+      if (fs.exists(indexPath)) {
+        fs.delete(indexPath, true);
+      }
+
+      mrCluster =
+          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+    } catch (IOException e) {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+        dfsCluster = null;
+      }
+
+      if (fs != null) {
+        fs.close();
+        fs = null;
+      }
+
+      if (mrCluster != null) {
+        mrCluster.shutdown();
+        mrCluster = null;
+      }
+
+      throw e;
+    }
+
+  }
+
+  protected void tearDown() throws Exception {
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+      mrCluster = null;
+    }
+
+    super.tearDown();
+  }
+
+  public void testIndexUpdater() throws IOException {
+    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+    // max field length, compound file and number of segments will be checked
+    // later
+    iconf.setIndexMaxFieldLength(2);
+    iconf.setIndexUseCompoundFile(true);
+    iconf.setIndexMaxNumSegments(1);
+    iconf.setMaxRAMSizeInBytes(20480);
+
+    long versionNumber = -1;
+    long generation = -1;
+
+    for (int i = 0; i < numRuns; i++) {
+      if (fs.exists(outputPath)) {
+        fs.delete(outputPath, true);
+      }
+
+      Shard[] shards = new Shard[initNumShards + i];
+      for (int j = 0; j < shards.length; j++) {
+        shards[j] =
+            new Shard(versionNumber, new Path(indexPath,
+                NUMBER_FORMAT.format(j)).toString(), generation);
+      }
+      run(i + 1, shards);
+    }
+  }
+
+  private void run(int numRuns, Shard[] shards) throws IOException {
+    IIndexUpdater updater = new IndexUpdater();
+    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+        shards);
+
+    // verify the done files
+    Path[] doneFileNames = new Path[shards.length];
+    int count = 0;
+    FileStatus[] fileStatus = fs.listStatus(outputPath);
+    for (int i = 0; i < fileStatus.length; i++) {
+      FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
+      for (int j = 0; j < doneFiles.length; j++) {
+        doneFileNames[count++] = doneFiles[j].getPath();
+      }
+    }
+    assertEquals(shards.length, count);
+    for (int i = 0; i < count; i++) {
+      assertTrue(doneFileNames[i].getName().startsWith(
+          IndexUpdateReducer.DONE.toString()));
+    }
+
+    // verify the index
+    IndexReader[] readers = new IndexReader[shards.length];
+    for (int i = 0; i < shards.length; i++) {
+      Directory dir =
+          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+              false, conf);
+      readers[i] = IndexReader.open(dir);
+    }
+
+    IndexReader reader = new MultiReader(readers);
+    IndexSearcher searcher = new IndexSearcher(reader);
+    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+
+    assertEquals(numRuns * numDocsPerRun, hits.length());
+
+    int[] counts = new int[numDocsPerRun];
+    for (int i = 0; i < hits.length(); i++) {
+      Document doc = hits.doc(i);
+      counts[Integer.parseInt(doc.get("id"))]++;
+    }
+
+    for (int i = 0; i < numDocsPerRun; i++) {
+      assertEquals(numRuns, counts[i]);
+    }
+
+    // max field length is 2, so "dot" is also indexed but not "org"
+    hits = searcher.search(new TermQuery(new Term("content", "dot")));
+    assertEquals(numRuns, hits.length());
+
+    hits = searcher.search(new TermQuery(new Term("content", "org")));
+    assertEquals(0, hits.length());
+
+    searcher.close();
+    reader.close();
+
+    // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
+    // to remove earlier checkpoints
+    for (int i = 0; i < shards.length; i++) {
+      Directory dir =
+          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+              false, conf);
+      IndexWriter writer =
+          new IndexWriter(dir, false, null,
+              new KeepOnlyLastCommitDeletionPolicy());
+      writer.close();
+    }
+
+    // verify the number of segments, must be done after an writer with
+    // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
+    for (int i = 0; i < shards.length; i++) {
+      PathFilter cfsFilter = new PathFilter() {
+        public boolean accept(Path path) {
+          return path.getName().endsWith(".cfs");
+        }
+      };
+      FileStatus[] cfsFiles =
+          fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
+      assertEquals(1, cfsFiles.length);
+    }
+  }
+
+}

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/vaidya/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/vaidya:r1397381-1401062

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/examples/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/examples:r1397381-1401062

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/java:r1397381-1401062

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred:r1397381-1401062

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs:r1397381-1401062

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs:r1397381-1401062

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:r1397381-1401062

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/webapps/job/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/webapps/job:r1397381-1401062



Mime
View raw message