accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ujustgotbi...@apache.org
Subject [49/50] [abbrv] git commit: ACCUMULO-2446 Wikisearch now works with Accumulo 1.5.0 on both Hadoop 1.0.4 and 2.0.4-alpha.
Date Wed, 19 Mar 2014 16:08:59 GMT
ACCUMULO-2446 Wikisearch now works with Accumulo 1.5.0 on both Hadoop 1.0.4 and
2.0.4-alpha.

Added unit tests that get activated under either hadoop profile. This was
necessary due to implementation changes with TaskAttemptContext.

Several runtime dependencies were explicitly added to the ingest and query
projects so that following deployment instructions required no extra steps.


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/1d5c80be
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/1d5c80be
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/1d5c80be

Branch: refs/heads/master
Commit: 1d5c80bedc973aa081ed9a7570a08651a03eb76c
Parents: 0ef257a
Author: Bill Slacum <billslacum@koverse.com>
Authored: Fri Feb 7 01:14:57 2014 -0500
Committer: Bill Slacum <billslacum@koverse.com>
Committed: Tue Mar 18 23:34:21 2014 -0400

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 README                                          |   6 +-
 README.parallel                                 |   2 +-
 accumulo-wikisearch.iml                         |  14 -
 ingest/bin/ingest.sh                            |   2 +-
 ingest/bin/ingest_parallel.sh                   |   4 +-
 ingest/pom.xml                                  |  70 ++-
 .../reader/AggregatingRecordReaderTest.java     | 288 +++++++++++
 .../reader/AggregatingRecordReaderTest.java     | 289 +++++++++++
 .../reader/AggregatingRecordReaderTest.java     | 288 -----------
 pom.xml                                         | 140 +++++-
 query/pom.xml                                   | 117 ++++-
 .../wikisearch/logic/TestQueryLogic.java        | 197 ++++++++
 .../wikisearch/logic/TestQueryLogic.java        | 477 +++++++++++++++++++
 .../wikisearch/logic/TestQueryLogic.java        | 197 --------
 15 files changed, 1558 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0f31ce3
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+**/target
+.idea
+**/*.iml
+**/lib
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/README
----------------------------------------------------------------------
diff --git a/README b/README
index 041490f..ad28cdc 100644
--- a/README
+++ b/README
@@ -38,11 +38,11 @@
 	1. Copy the query/src/main/resources/META-INF/ejb-jar.xml.example file to 
 	   query/src/main/resources/META-INF/ejb-jar.xml. Modify to the file to contain the same 
 	   information that you put into the wikipedia.xml file from the Ingest step above. 
-	2. Re-build the query distribution by running 'mvn package assembly:single' in the top-level directory. 
+	2. Re-build the query distribution by running 'mvn package assembly:single' in the query module's directory.
         3. Untar the resulting file in the $JBOSS_HOME/server/default directory.
 
               $ cd $JBOSS_HOME/server/default
-              $ tar -xzf $ACCUMULO_HOME/src/examples/wikisearch/query/target/wikisearch-query*.tar.gz
+              $ tar -xzf /some/path/to/wikisearch/query/target/wikisearch-query*.tar.gz
  
            This will place the dependent jars in the lib directory and the EJB jar into the deploy directory.
 	4. Next, copy the wikisearch*.war file in the query-war/target directory to $JBOSS_HOME/server/default/deploy. 
@@ -51,11 +51,9 @@
 			setauths -u <user> -s all,enwiki,eswiki,frwiki,fawiki
 	7. Copy the following jars to the $ACCUMULO_HOME/lib/ext directory from the $JBOSS_HOME/server/default/lib directory:
 	
-		commons-lang*.jar
 		kryo*.jar
 		minlog*.jar
 		commons-jexl*.jar
-		google-collections*.jar
 		
 	8. Copy the $JBOSS_HOME/server/default/deploy/wikisearch-query*.jar to $ACCUMULO_HOME/lib/ext.
 

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/README.parallel
----------------------------------------------------------------------
diff --git a/README.parallel b/README.parallel
index 477556b..399f0f3 100644
--- a/README.parallel
+++ b/README.parallel
@@ -52,7 +52,7 @@
 		kryo*.jar
 		minlog*.jar
 		commons-jexl*.jar
-		google-collections*.jar
+		guava*.jar
 		
 	8. Copy the $JBOSS_HOME/server/default/deploy/wikisearch-query*.jar to $ACCUMULO_HOME/lib/ext.
 

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/accumulo-wikisearch.iml
----------------------------------------------------------------------
diff --git a/accumulo-wikisearch.iml b/accumulo-wikisearch.iml
deleted file mode 100644
index 8015fa7..0000000
--- a/accumulo-wikisearch.iml
+++ /dev/null
@@ -1,14 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
-  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
-    <output url="file://$MODULE_DIR$/target/classes" />
-    <output-test url="file://$MODULE_DIR$/target/test-classes" />
-    <exclude-output />
-    <content url="file://$MODULE_DIR$">
-      <excludeFolder url="file://$MODULE_DIR$/target" />
-    </content>
-    <orderEntry type="inheritedJdk" />
-    <orderEntry type="sourceFolder" forTests="false" />
-  </component>
-</module>
-

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/ingest/bin/ingest.sh
----------------------------------------------------------------------
diff --git a/ingest/bin/ingest.sh b/ingest/bin/ingest.sh
index dbb9b05..73d582d 100755
--- a/ingest/bin/ingest.sh
+++ b/ingest/bin/ingest.sh
@@ -38,7 +38,7 @@ LIBJARS=`echo $CLASSPATH | sed 's/^://' | sed 's/:/,/g'`
 #
 # Map/Reduce job
 #
-JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.5.0-SNAPSHOT.jar
+JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.5.0.jar
 CONF=$SCRIPT_DIR/../conf/wikipedia.xml
 HDFS_DATA_DIR=$1
 export HADOOP_CLASSPATH=$CLASSPATH

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/ingest/bin/ingest_parallel.sh
----------------------------------------------------------------------
diff --git a/ingest/bin/ingest_parallel.sh b/ingest/bin/ingest_parallel.sh
index 003b7f9..1619603 100755
--- a/ingest/bin/ingest_parallel.sh
+++ b/ingest/bin/ingest_parallel.sh
@@ -38,8 +38,8 @@ LIBJARS=`echo $CLASSPATH | sed 's/^://' | sed 's/:/,/g'`
 #
 # Map/Reduce job
 #
-JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.5.0-SNAPSHOT.jar
-CONF=$SCRIPT_DIR/../conf/wikipedia.xml
+JAR=$SCRIPT_DIR/../lib/wikisearch-ingest-1.5.0.jar
+CONF=$SCRIPT_DIR/../conf/wikipedia_parallel.xml
 HDFS_DATA_DIR=$1
 export HADOOP_CLASSPATH=$CLASSPATH
 echo "hadoop jar $JAR org.apache.accumulo.examples.wikisearch.ingest.WikipediaPartitionedIngester -libjars $LIBJARS -conf $CONF -Dwikipedia.input=${HDFS_DATA_DIR}"

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/ingest/pom.xml
----------------------------------------------------------------------
diff --git a/ingest/pom.xml b/ingest/pom.xml
index fdf08e9..a6f3d70 100644
--- a/ingest/pom.xml
+++ b/ingest/pom.xml
@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
-  <!--
+<!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements. See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
@@ -22,14 +22,16 @@
     <artifactId>accumulo-wikisearch</artifactId>
     <version>1.5.0</version>
   </parent>
-
   <artifactId>wikisearch-ingest</artifactId>
   <name>wikisearch-ingest</name>
-
   <dependencies>
     <dependency>
-      <groupId>com.google.collections</groupId>
-      <artifactId>google-collections</artifactId>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
     </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
@@ -52,10 +54,6 @@
       <artifactId>accumulo-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.lucene</groupId>
       <artifactId>lucene-core</artifactId>
     </dependency>
@@ -64,12 +62,16 @@
       <artifactId>lucene-wikipedia</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>
-
   <build>
     <plugins>
       <plugin>
@@ -81,11 +83,12 @@
             <goals>
               <goal>copy-dependencies</goal>
             </goals>
-            <phase>process-resources</phase>
+            <phase>prepare-package</phase>
             <configuration>
               <outputDirectory>lib</outputDirectory>
               <!-- just grab the non-provided runtime dependencies -->
-              <includeArtifactIds>commons-lang,google-collections,lucene-core,lucene-analyzers,lucene-wikipedia,protobuf-java,accumulo-core,hadoop-core,libthrift,cloudtrace,zookeeper,commons-codec</includeArtifactIds>
+              <!-- XXX we include guava at the same version as hadoop 2 provides so that we have it on hadoop 1 -->
+              <includeArtifactIds>commons-lang,guava,lucene-core,lucene-analyzers,lucene-wikipedia,protobuf-java,accumulo-core,hadoop-core,libthrift,zookeeper,commons-codec,accumulo-fate,accumulo-trace</includeArtifactIds>
               <excludeTransitive>false</excludeTransitive>
             </configuration>
           </execution>
@@ -102,4 +105,47 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <!-- profile for building against Hadoop 1.0.x
+    Activate by not specifying hadoop.profile -->
+    <profile>
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <version>${hadoop.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <!-- profile for building against Hadoop 2.0.x
+    Activate using: mvn -Dhadoop.profile=2.0 -->
+    <profile>
+      <id>hadoop-2.0</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2.0</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+          <version>${hadoop.version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>org.codehaus.jackson</groupId>
+              <artifactId>jackson-mapper-asl</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/ingest/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/ingest/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java b/ingest/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
new file mode 100644
index 0000000..f79221d
--- /dev/null
+++ b/ingest/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.StringReader;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathFactory;
+
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Before;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.xml.sax.ErrorHandler;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.SAXParseException;
+
+public class AggregatingRecordReaderTest {
+  
+  public static class MyErrorHandler implements ErrorHandler {
+    
+    @Override
+    public void error(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+    @Override
+    public void fatalError(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+    @Override
+    public void warning(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+  }
+  
+  private static final String xml1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n"
+      + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+      + "<doc>\n" + "  <a>E</a>\n" + "  <b>F</b>\n" + "</doc>\n";
+  
+  private static final String xml2 = "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>E</a>\n"
+      + "  <b>F</b>\n" + "</doc>\n";
+  
+  private static final String xml3 = "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n"
+      + "<doc>\n" + "  <a>E</a>\n";
+  
+  private static final String xml4 = "<doc>" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>" + "<doc>"
+      + "  <a>E</a>" + "  <b>F</b>" + "</doc>";
+  
+  private static final String xml5 = "<doc attr=\"G\">" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>"
+      + "<doc attr=\"H\"/>" + "<doc>" + "  <a>E</a>" + "  <b>F</b>" + "</doc>" + "<doc attr=\"I\"/>";
+  
+  private Configuration conf = null;
+  private TaskAttemptContext ctx = null;
+  private static DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+  private XPathFactory xpFactory = XPathFactory.newInstance();
+  private XPathExpression EXPR_A = null;
+  private XPathExpression EXPR_B = null;
+  private XPathExpression EXPR_ATTR = null;
+  
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set(AggregatingRecordReader.START_TOKEN, "<doc");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</doc>");
+    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(true));
+    TaskAttemptID id = new TaskAttemptID();
+    ctx = new TaskAttemptContext(conf, id);
+    XPath xp = xpFactory.newXPath();
+    EXPR_A = xp.compile("/doc/a");
+    EXPR_B = xp.compile("/doc/b");
+    EXPR_ATTR = xp.compile("/doc/@attr");
+  }
+  
+  public File createFile(String data) throws Exception {
+    // Write out test file
+    File f = File.createTempFile("aggReaderTest", ".xml");
+    f.deleteOnExit();
+    FileWriter writer = new FileWriter(f);
+    writer.write(data);
+    writer.flush();
+    writer.close();
+    return f;
+  }
+  
+  private void testXML(Text xml, String aValue, String bValue, String attrValue) throws Exception {
+    StringReader reader = new StringReader(xml.toString());
+    InputSource source = new InputSource(reader);
+    
+    DocumentBuilder parser = factory.newDocumentBuilder();
+    parser.setErrorHandler(new MyErrorHandler());
+    Document root = parser.parse(source);
+    assertNotNull(root);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_A.evaluate(source), aValue);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_B.evaluate(source), bValue);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_ATTR.evaluate(source), attrValue);
+  }
+  
+  @Test
+  public void testIncorrectArgs() throws Exception {
+    File f = createFile(xml1);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    try {
+      // Clear the values for BEGIN and STOP TOKEN
+      conf.set(AggregatingRecordReader.START_TOKEN, null);
+      conf.set(AggregatingRecordReader.END_TOKEN, null);
+      reader.initialize(split, ctx);
+      // If we got here, then the code didnt throw an exception
+      fail();
+    } catch (Exception e) {
+      // Do nothing, we succeeded
+      f = null;
+    }
+    reader.close();
+  }
+  
+  @Test
+  public void testCorrectXML() throws Exception {
+    File f = createFile(xml1);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+    
+  }
+  
+  @Test
+  public void testPartialXML() throws Exception {
+    File f = createFile(xml2);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  public void testPartialXML2WithNoPartialRecordsReturned() throws Exception {
+    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false));
+    File f = createFile(xml3);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testPartialXML2() throws Exception {
+    File f = createFile(xml3);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    try {
+      testXML(reader.getCurrentValue(), "E", "", "");
+      fail("Fragment returned, and it somehow passed XML parsing.");
+    } catch (SAXParseException e) {
+      // ignore
+    }
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testLineSplitting() throws Exception {
+    File f = createFile(xml4);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testNoEndTokenHandling() throws Exception {
+    File f = createFile(xml5);
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "G");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "", "", "H");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "", "", "I");
+    assertTrue("Too many records returned.", !reader.nextKeyValue());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/ingest/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/ingest/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java b/ingest/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
new file mode 100644
index 0000000..d9443bc
--- /dev/null
+++ b/ingest/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.StringReader;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathFactory;
+
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Before;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.xml.sax.ErrorHandler;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.SAXParseException;
+
+public class AggregatingRecordReaderTest {
+  
+  public static class MyErrorHandler implements ErrorHandler {
+    
+    @Override
+    public void error(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+    @Override
+    public void fatalError(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+    @Override
+    public void warning(SAXParseException exception) throws SAXException {
+      // System.out.println(exception.getMessage());
+    }
+    
+  }
+  
+  private static final String xml1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n"
+      + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+      + "<doc>\n" + "  <a>E</a>\n" + "  <b>F</b>\n" + "</doc>\n";
+  
+  private static final String xml2 = "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>E</a>\n"
+      + "  <b>F</b>\n" + "</doc>\n";
+  
+  private static final String xml3 = "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n"
+      + "<doc>\n" + "  <a>E</a>\n";
+  
+  private static final String xml4 = "<doc>" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>" + "<doc>"
+      + "  <a>E</a>" + "  <b>F</b>" + "</doc>";
+  
+  private static final String xml5 = "<doc attr=\"G\">" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>"
+      + "<doc attr=\"H\"/>" + "<doc>" + "  <a>E</a>" + "  <b>F</b>" + "</doc>" + "<doc attr=\"I\"/>";
+  
+  private Configuration conf = null;
+  private TaskAttemptContext ctx = null;
+  private static DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+  private XPathFactory xpFactory = XPathFactory.newInstance();
+  private XPathExpression EXPR_A = null;
+  private XPathExpression EXPR_B = null;
+  private XPathExpression EXPR_ATTR = null;
+  
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set(AggregatingRecordReader.START_TOKEN, "<doc");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</doc>");
+    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(true));
+    TaskAttemptID id = new TaskAttemptID();
+    ctx = new TaskAttemptContextImpl(conf, id);
+    XPath xp = xpFactory.newXPath();
+    EXPR_A = xp.compile("/doc/a");
+    EXPR_B = xp.compile("/doc/b");
+    EXPR_ATTR = xp.compile("/doc/@attr");
+  }
+  
+  public File createFile(String data) throws Exception {
+    // Write out test file
+    File f = File.createTempFile("aggReaderTest", ".xml");
+    f.deleteOnExit();
+    FileWriter writer = new FileWriter(f);
+    writer.write(data);
+    writer.flush();
+    writer.close();
+    return f;
+  }
+  
+  private void testXML(Text xml, String aValue, String bValue, String attrValue) throws Exception {
+    StringReader reader = new StringReader(xml.toString());
+    InputSource source = new InputSource(reader);
+    
+    DocumentBuilder parser = factory.newDocumentBuilder();
+    parser.setErrorHandler(new MyErrorHandler());
+    Document root = parser.parse(source);
+    assertNotNull(root);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_A.evaluate(source), aValue);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_B.evaluate(source), bValue);
+    
+    reader = new StringReader(xml.toString());
+    source = new InputSource(reader);
+    assertEquals(EXPR_ATTR.evaluate(source), attrValue);
+  }
+  
+  @Test
+  public void testIncorrectArgs() throws Exception {
+    File f = createFile(xml1);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    try {
+      // Clear the values for BEGIN and STOP TOKEN
+      conf.set(AggregatingRecordReader.START_TOKEN, null);
+      conf.set(AggregatingRecordReader.END_TOKEN, null);
+      reader.initialize(split, ctx);
+      // If we got here, then the code didnt throw an exception
+      fail();
+    } catch (Exception e) {
+      // Do nothing, we succeeded
+      f = null;
+    }
+    reader.close();
+  }
+  
+  @Test
+  public void testCorrectXML() throws Exception {
+    File f = createFile(xml1);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+    
+  }
+  
+  @Test
+  public void testPartialXML() throws Exception {
+    File f = createFile(xml2);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  public void testPartialXML2WithNoPartialRecordsReturned() throws Exception {
+    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false));
+    File f = createFile(xml3);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testPartialXML2() throws Exception {
+    File f = createFile(xml3);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    try {
+      testXML(reader.getCurrentValue(), "E", "", "");
+      fail("Fragment returned, and it somehow passed XML parsing.");
+    } catch (SAXParseException e) {
+      // ignore
+    }
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testLineSplitting() throws Exception {
+    File f = createFile(xml4);
+    
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue(reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue(!reader.nextKeyValue());
+  }
+  
+  @Test
+  public void testNoEndTokenHandling() throws Exception {
+    File f = createFile(xml5);
+    // Create FileSplit
+    Path p = new Path(f.toURI().toString());
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
+    
+    // Initialize the RecordReader
+    AggregatingRecordReader reader = new AggregatingRecordReader();
+    reader.initialize(split, ctx);
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "A", "B", "G");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "C", "D", "");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "", "", "H");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "E", "F", "");
+    assertTrue("Not enough records returned.", reader.nextKeyValue());
+    testXML(reader.getCurrentValue(), "", "", "I");
+    assertTrue("Too many records returned.", !reader.nextKeyValue());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java b/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
deleted file mode 100644
index f79221d..0000000
--- a/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.examples.wikisearch.reader;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.StringReader;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathExpression;
-import javax.xml.xpath.XPathFactory;
-
-import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.junit.Before;
-import org.junit.Test;
-import org.w3c.dom.Document;
-import org.xml.sax.ErrorHandler;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-import org.xml.sax.SAXParseException;
-
-public class AggregatingRecordReaderTest {
-  
-  public static class MyErrorHandler implements ErrorHandler {
-    
-    @Override
-    public void error(SAXParseException exception) throws SAXException {
-      // System.out.println(exception.getMessage());
-    }
-    
-    @Override
-    public void fatalError(SAXParseException exception) throws SAXException {
-      // System.out.println(exception.getMessage());
-    }
-    
-    @Override
-    public void warning(SAXParseException exception) throws SAXException {
-      // System.out.println(exception.getMessage());
-    }
-    
-  }
-  
-  private static final String xml1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n"
-      + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
-      + "<doc>\n" + "  <a>E</a>\n" + "  <b>F</b>\n" + "</doc>\n";
-  
-  private static final String xml2 = "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>E</a>\n"
-      + "  <b>F</b>\n" + "</doc>\n";
-  
-  private static final String xml3 = "<doc>\n" + "  <a>A</a>\n" + "  <b>B</b>\n" + "</doc>\n" + "<doc>\n" + "  <a>C</a>\n" + "  <b>D</b>\n" + "</doc>\n"
-      + "<doc>\n" + "  <a>E</a>\n";
-  
-  private static final String xml4 = "<doc>" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>" + "<doc>"
-      + "  <a>E</a>" + "  <b>F</b>" + "</doc>";
-  
-  private static final String xml5 = "<doc attr=\"G\">" + "  <a>A</a>" + "  <b>B</b>" + "</doc>" + "<doc>" + "  <a>C</a>" + "  <b>D</b>" + "</doc>"
-      + "<doc attr=\"H\"/>" + "<doc>" + "  <a>E</a>" + "  <b>F</b>" + "</doc>" + "<doc attr=\"I\"/>";
-  
-  private Configuration conf = null;
-  private TaskAttemptContext ctx = null;
-  private static DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-  private XPathFactory xpFactory = XPathFactory.newInstance();
-  private XPathExpression EXPR_A = null;
-  private XPathExpression EXPR_B = null;
-  private XPathExpression EXPR_ATTR = null;
-  
-  @Before
-  public void setUp() throws Exception {
-    conf = new Configuration();
-    conf.set(AggregatingRecordReader.START_TOKEN, "<doc");
-    conf.set(AggregatingRecordReader.END_TOKEN, "</doc>");
-    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(true));
-    TaskAttemptID id = new TaskAttemptID();
-    ctx = new TaskAttemptContext(conf, id);
-    XPath xp = xpFactory.newXPath();
-    EXPR_A = xp.compile("/doc/a");
-    EXPR_B = xp.compile("/doc/b");
-    EXPR_ATTR = xp.compile("/doc/@attr");
-  }
-  
-  public File createFile(String data) throws Exception {
-    // Write out test file
-    File f = File.createTempFile("aggReaderTest", ".xml");
-    f.deleteOnExit();
-    FileWriter writer = new FileWriter(f);
-    writer.write(data);
-    writer.flush();
-    writer.close();
-    return f;
-  }
-  
-  private void testXML(Text xml, String aValue, String bValue, String attrValue) throws Exception {
-    StringReader reader = new StringReader(xml.toString());
-    InputSource source = new InputSource(reader);
-    
-    DocumentBuilder parser = factory.newDocumentBuilder();
-    parser.setErrorHandler(new MyErrorHandler());
-    Document root = parser.parse(source);
-    assertNotNull(root);
-    
-    reader = new StringReader(xml.toString());
-    source = new InputSource(reader);
-    assertEquals(EXPR_A.evaluate(source), aValue);
-    
-    reader = new StringReader(xml.toString());
-    source = new InputSource(reader);
-    assertEquals(EXPR_B.evaluate(source), bValue);
-    
-    reader = new StringReader(xml.toString());
-    source = new InputSource(reader);
-    assertEquals(EXPR_ATTR.evaluate(source), attrValue);
-  }
-  
-  @Test
-  public void testIncorrectArgs() throws Exception {
-    File f = createFile(xml1);
-    
-    // Create FileSplit
-    Path p = new Path(f.toURI().toString());
-    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
-    AggregatingRecordReader reader = new AggregatingRecordReader();
-    try {
-      // Clear the values for BEGIN and STOP TOKEN
-      conf.set(AggregatingRecordReader.START_TOKEN, null);
-      conf.set(AggregatingRecordReader.END_TOKEN, null);
-      reader.initialize(split, ctx);
-      // If we got here, then the code didnt throw an exception
-      fail();
-    } catch (Exception e) {
-      // Do nothing, we succeeded
-      f = null;
-    }
-    reader.close();
-  }
-  
-  @Test
-  public void testCorrectXML() throws Exception {
-    File f = createFile(xml1);
-    
-    // Create FileSplit
-    Path p = new Path(f.toURI().toString());
-    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
-    
-    // Initialize the RecordReader
-    AggregatingRecordReader reader = new AggregatingRecordReader();
-    reader.initialize(split, ctx);
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "A", "B", "");
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "C", "D", "");
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "E", "F", "");
-    assertTrue(!reader.nextKeyValue());
-    
-  }
-  
-  @Test
-  public void testPartialXML() throws Exception {
-    File f = createFile(xml2);
-    
-    // Create FileSplit
-    Path p = new Path(f.toURI().toString());
-    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
-    
-    // Initialize the RecordReader
-    AggregatingRecordReader reader = new AggregatingRecordReader();
-    reader.initialize(split, ctx);
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "C", "D", "");
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "E", "F", "");
-    assertTrue(!reader.nextKeyValue());
-  }
-  
-  public void testPartialXML2WithNoPartialRecordsReturned() throws Exception {
-    conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false));
-    File f = createFile(xml3);
-    
-    // Create FileSplit
-    Path p = new Path(f.toURI().toString());
-    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
-    
-    // Initialize the RecordReader
-    AggregatingRecordReader reader = new AggregatingRecordReader();
-    reader.initialize(split, ctx);
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "A", "B", "");
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "C", "D", "");
-    assertTrue(!reader.nextKeyValue());
-  }
-  
-  @Test
-  public void testPartialXML2() throws Exception {
-    File f = createFile(xml3);
-    
-    // Create FileSplit
-    Path p = new Path(f.toURI().toString());
-    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
-    
-    // Initialize the RecordReader
-    AggregatingRecordReader reader = new AggregatingRecordReader();
-    reader.initialize(split, ctx);
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "A", "B", "");
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "C", "D", "");
-    assertTrue(reader.nextKeyValue());
-    try {
-      testXML(reader.getCurrentValue(), "E", "", "");
-      fail("Fragment returned, and it somehow passed XML parsing.");
-    } catch (SAXParseException e) {
-      // ignore
-    }
-    assertTrue(!reader.nextKeyValue());
-  }
-  
-  @Test
-  public void testLineSplitting() throws Exception {
-    File f = createFile(xml4);
-    
-    // Create FileSplit
-    Path p = new Path(f.toURI().toString());
-    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
-    
-    // Initialize the RecordReader
-    AggregatingRecordReader reader = new AggregatingRecordReader();
-    reader.initialize(split, ctx);
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "A", "B", "");
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "C", "D", "");
-    assertTrue(reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "E", "F", "");
-    assertTrue(!reader.nextKeyValue());
-  }
-  
-  @Test
-  public void testNoEndTokenHandling() throws Exception {
-    File f = createFile(xml5);
-    // Create FileSplit
-    Path p = new Path(f.toURI().toString());
-    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
-    
-    // Initialize the RecordReader
-    AggregatingRecordReader reader = new AggregatingRecordReader();
-    reader.initialize(split, ctx);
-    assertTrue("Not enough records returned.", reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "A", "B", "G");
-    assertTrue("Not enough records returned.", reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "C", "D", "");
-    assertTrue("Not enough records returned.", reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "", "", "H");
-    assertTrue("Not enough records returned.", reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "E", "F", "");
-    assertTrue("Not enough records returned.", reader.nextKeyValue());
-    testXML(reader.getCurrentValue(), "", "", "I");
-    assertTrue("Too many records returned.", !reader.nextKeyValue());
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 919b08b..894a132 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,6 @@
   <artifactId>accumulo-wikisearch</artifactId>
   <packaging>pom</packaging>
   <name>accumulo-wikisearch</name>
-
   <modules>
     <module>ingest</module>
     <module>query</module>
@@ -38,10 +37,9 @@
     <version.commons-jexl>2.0.1</version.commons-jexl>
     <version.commons-lang>2.4</version.commons-lang>
     <version.ejb-spec-api>1.0.1.Final</version.ejb-spec-api>
-    <version.googlecollections>1.0</version.googlecollections>
+    <version.guava>11.0.2</version.guava>
     <version.jaxrs>2.1.0.GA</version.jaxrs>
     <version.kryo>1.04</version.kryo>
-    <version.libthrift>0.6.1</version.libthrift>
     <version.log4j>1.2.16</version.log4j>
     <version.log4j-extras>1.0</version.log4j-extras>
     <version.lucene>3.0.2</version.lucene>
@@ -49,14 +47,15 @@
     <version.lucene-wikipedia>3.0.2</version.lucene-wikipedia>
     <version.minlog>1.2</version.minlog>
     <version.protobuf>2.3.0</version.protobuf>
+    <version.thrift>0.9.0</version.thrift>
     <version.zookeeper>3.3.1</version.zookeeper>
   </properties>
   <dependencyManagement>
     <dependencies>
       <dependency>
-        <groupId>com.google.collections</groupId>
-        <artifactId>google-collections</artifactId>
-        <version>${version.googlecollections}</version>
+        <groupId>com.google.guava</groupId>
+        <artifactId>guava</artifactId>
+        <version>${version.guava}</version>
       </dependency>
       <dependency>
         <groupId>com.google.protobuf</groupId>
@@ -82,6 +81,12 @@
         <groupId>com.sun.jersey</groupId>
         <artifactId>jersey-server</artifactId>
         <version>1.11</version>
+        <exclusions>
+          <exclusion>
+            <groupId>asm</groupId>
+            <artifactId>asm</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
       <dependency>
         <groupId>commons-codec</groupId>
@@ -93,6 +98,28 @@
         <artifactId>commons-lang</artifactId>
         <version>${version.commons-lang}</version>
       </dependency>
+      <!-- XXX This is just to fix the dependency conflict in Hadoop 1 -->
+      <dependency>
+        <groupId>net.java.dev.jets3t</groupId>
+        <artifactId>jets3t</artifactId>
+        <version>0.7.1</version>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-core</artifactId>
+        <version>${version.accumulo}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-trace</artifactId>
+        <version>${version.accumulo}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-jexl</artifactId>
@@ -112,6 +139,24 @@
         <groupId>org.apache.lucene</groupId>
         <artifactId>lucene-wikipedia</artifactId>
         <version>${version.lucene-wikipedia}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>commons-digester</groupId>
+            <artifactId>commons-digester</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.thrift</groupId>
+        <artifactId>libthrift</artifactId>
+        <version>${version.thrift}</version>
+        <exclusions>
+            <!-- excluded to make the enforcer plug in happy-->
+            <exclusion>
+                <groupId>org.apache.httpcomponents</groupId>
+                <artifactId>httpcore</artifactId>
+            </exclusion>
+        </exclusions>
       </dependency>
     </dependencies>
   </dependencyManagement>
@@ -135,7 +180,6 @@
       <layout>default</layout>
     </repository>
   </repositories>
-
   <build>
     <defaultGoal>package</defaultGoal>
     <plugins>
@@ -148,6 +192,11 @@
             <goals>
               <goal>enforce</goal>
             </goals>
+            <configuration>
+              <rules>
+                <DependencyConvergence />
+              </rules>
+            </configuration>
           </execution>
         </executions>
       </plugin>
@@ -165,6 +214,10 @@
         </configuration>
       </plugin>
       <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+      </plugin>
+      <plugin>
         <artifactId>maven-jar-plugin</artifactId>
         <configuration>
           <outputDirectory>lib</outputDirectory>
@@ -218,11 +271,11 @@
             <goals>
               <goal>copy-dependencies</goal>
             </goals>
-            <phase>process-resources</phase>
+            <phase>prepare-package</phase>
             <configuration>
               <outputDirectory>../../lib</outputDirectory>
               <!-- just grab the non-provided runtime dependencies -->
-              <includeArtifactIds>commons-collections,commons-configuration,commons-io,commons-lang,jline,log4j,libthrift,commons-jci-core,commons-jci-fam,commons-logging,commons-logging-api,cloudtrace</includeArtifactIds>
+              <includeArtifactIds>commons-collections,commons-configuration,commons-io,commons-lang,jline,log4j,libthrift,commons-jci-core,commons-jci-fam,commons-logging,commons-logging-api</includeArtifactIds>
               <excludeGroupIds>accumulo</excludeGroupIds>
               <excludeTransitive>true</excludeTransitive>
             </configuration>
@@ -231,4 +284,73 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <!-- profile for building against Hadoop 1.0.x
+      Activate by not specifying hadoop.profile -->
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>1.8</version>
+            <executions>
+              <execution>
+                <id>add-test-source</id>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <phase>generate-test-sources</phase>
+                <configuration>
+                  <sources>
+                    <source>src/test/hadoop1</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <!-- profile for building against Hadoop 2.0.x
+      Activate using: mvn -Dhadoop.profile=2.0 -->
+      <id>hadoop-2.0</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2.0</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>1.8</version>
+            <executions>
+              <execution>
+                <id>add-test-source</id>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <phase>generate-test-sources</phase>
+                <configuration>
+                  <sources>
+                    <source>src/test/hadoop2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/query/pom.xml
----------------------------------------------------------------------
diff --git a/query/pom.xml b/query/pom.xml
index 6900919..be6f6b2 100644
--- a/query/pom.xml
+++ b/query/pom.xml
@@ -22,14 +22,13 @@
     <artifactId>accumulo-wikisearch</artifactId>
     <version>1.5.0</version>
   </parent>
-
   <artifactId>wikisearch-query</artifactId>
   <packaging>ejb</packaging>
   <name>wikisearch-query</name>
   <dependencies>
     <dependency>
-      <groupId>com.google.collections</groupId>
-      <artifactId>google-collections</artifactId>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
     </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
@@ -56,10 +55,6 @@
       <artifactId>commons-lang</artifactId>
     </dependency>
     <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
     </dependency>
@@ -83,6 +78,43 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>com.googlecode</groupId>
+      <artifactId>minlog</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+      <version>1.6</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.1</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-fate</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-trace</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -99,11 +131,12 @@
             <goals>
               <goal>copy-dependencies</goal>
             </goals>
-            <phase>process-resources</phase>
+            <phase>prepare-package</phase>
             <configuration>
               <outputDirectory>lib</outputDirectory>
               <!-- just grab the non-provided runtime dependencies -->
-              <includeArtifactIds>commons-lang,commons-codec,protobuf-java,libthrift,zookeeper,hadoop-core,commons-jexl,google-collections,kryo,asm,minlog,reflectasm,wikisearch-ingest,accumulo-core,cloudtrace</includeArtifactIds>
+              <!-- XXX we include guava at the same version as hadoop 2 provides so that we have it on hadoop 1 -->
+              <includeArtifactIds>commons-io,commons-configuration,commons-lang,commons-codec,protobuf-java,libthrift,zookeeper,hadoop-core,commons-jexl,guava,kryo,asm,minlog,reflectasm,wikisearch-ingest,accumulo-core,accumulo-fate,accumulo-trace</includeArtifactIds>
               <excludeTransitive>true</excludeTransitive>
             </configuration>
           </execution>
@@ -136,4 +169,70 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <!-- profile for building against Hadoop 1.0.x
+    Activate by not specifying hadoop.profile -->
+    <profile>
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <version>${hadoop.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <!-- profile for building against Hadoop 2.0.x
+    Activate using: mvn -Dhadoop.profile=2.0 -->
+    <profile>
+      <id>hadoop-2.0</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2.0</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+          <version>${hadoop.version}</version>
+          <exclusions>
+            <exclusion>
+              <groupId>org.codehaus.jackson</groupId>
+              <artifactId>jackson-mapper-asl</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>copy-dependencies</id>
+                <goals>
+                  <goal>copy-dependencies</goal>
+                </goals>
+                <phase>prepare-package</phase>
+                <configuration>
+                  <outputDirectory>lib</outputDirectory>
+                  <!-- just grab the non-provided runtime dependencies -->
+                  <includeArtifactIds>commons-io,commons-configuration,commons-lang,commons-codec,protobuf-java,libthrift,zookeeper,hadoop-client,hadoop-common,hadoop-hdfs,commons-jexl,guava,kryo,asm,minlog,reflectasm,wikisearch-ingest,accumulo-core,accumulo-fate,accumulo-trace</includeArtifactIds>
+                  <excludeTransitive>false</excludeTransitive>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/query/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
----------------------------------------------------------------------
diff --git a/query/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java b/query/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
new file mode 100644
index 0000000..ac8241e
--- /dev/null
+++ b/query/src/test/hadoop1/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.logic;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import junit.framework.Assert;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaIngester;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
+import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator;
+import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
+import org.apache.accumulo.examples.wikisearch.sample.Document;
+import org.apache.accumulo.examples.wikisearch.sample.Field;
+import org.apache.accumulo.examples.wikisearch.sample.Results;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestQueryLogic {
+  
+  private static final String METADATA_TABLE_NAME = "wikiMetadata";
+  
+  private static final String TABLE_NAME = "wiki";
+  
+  private static final String INDEX_TABLE_NAME = "wikiIndex";
+  
+  private static final String RINDEX_TABLE_NAME = "wikiReverseIndex";
+  
+  private static final String TABLE_NAMES[] = {METADATA_TABLE_NAME, TABLE_NAME, RINDEX_TABLE_NAME, INDEX_TABLE_NAME};
+  
+  private class MockAccumuloRecordWriter extends RecordWriter<Text,Mutation> {
+    @Override
+    public void write(Text key, Mutation value) throws IOException, InterruptedException {
+      try {
+        writerMap.get(key).addMutation(value);
+      } catch (MutationsRejectedException e) {
+        throw new IOException("Error adding mutation", e);
+      }
+    }
+    
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      try {
+        for (BatchWriter w : writerMap.values()) {
+          w.flush();
+          w.close();
+        }
+      } catch (MutationsRejectedException e) {
+        throw new IOException("Error closing Batch Writer", e);
+      }
+    }
+    
+  }
+  
+  private Connector c = null;
+  private Configuration conf = new Configuration();
+  private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>();
+  private QueryLogic table = null;
+  
+  @Before
+  public void setup() throws Exception {
+    
+    Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.DEBUG);
+    Logger.getLogger(QueryLogic.class).setLevel(Level.DEBUG);
+    Logger.getLogger(RangeCalculator.class).setLevel(Level.DEBUG);
+    
+    conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
+    conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
+    conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
+    
+    MockInstance i = new MockInstance();
+    c = i.getConnector("root", new PasswordToken(""));
+    WikipediaIngester.createTables(c.tableOperations(), TABLE_NAME, false);
+    for (String table : TABLE_NAMES) {
+      writerMap.put(new Text(table), c.createBatchWriter(table, 1000L, 1000L, 1));
+    }
+    
+    TaskAttemptID id = new TaskAttemptID();
+    TaskAttemptContext context = new TaskAttemptContext(conf, id);
+    
+    RawLocalFileSystem fs = new RawLocalFileSystem();
+    fs.setConf(conf);
+    
+    URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml");
+    Assert.assertNotNull(url);
+    File data = new File(url.toURI());
+    Path tmpFile = new Path(data.getAbsolutePath());
+    
+    // Setup the Mapper
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null), 0);
+    AggregatingRecordReader rr = new AggregatingRecordReader();
+    Path ocPath = new Path(tmpFile, "oc");
+    OutputCommitter oc = new FileOutputCommitter(ocPath, context);
+    fs.deleteOnExit(ocPath);
+    StandaloneStatusReporter sr = new StandaloneStatusReporter();
+    rr.initialize(split, context);
+    MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter();
+    WikipediaMapper mapper = new WikipediaMapper();
+    
+    // Load data into Mock Accumulo
+    Mapper<LongWritable,Text,Text,Mutation>.Context con = mapper.new Context(conf, id, rr, rw, oc, sr, split);
+    mapper.run(con);
+    
+    // Flush and close record writers.
+    rw.close(context);
+    
+    table = new QueryLogic();
+    table.setMetadataTableName(METADATA_TABLE_NAME);
+    table.setTableName(TABLE_NAME);
+    table.setIndexTableName(INDEX_TABLE_NAME);
+    table.setReverseIndexTableName(RINDEX_TABLE_NAME);
+    table.setUseReadAheadIterator(false);
+    table.setUnevaluatedFields(Collections.singletonList("TEXT"));
+  }
+  
+  void debugQuery(String tableName) throws Exception {
+    Scanner s = c.createScanner(tableName, new Authorizations("all"));
+    Range r = new Range();
+    s.setRange(r);
+    for (Entry<Key,Value> entry : s)
+      System.out.println(entry.getKey().toString() + " " + entry.getValue().toString());
+  }
+  
+  @Test
+  public void testTitle() throws Exception {
+    Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.OFF);
+    Logger.getLogger(RangeCalculator.class).setLevel(Level.OFF);
+    List<String> auths = new ArrayList<String>();
+    auths.add("enwiki");
+    
+    Results results = table.runQuery(c, auths, "TITLE == 'asphalt' or TITLE == 'abacus' or TITLE == 'acid' or TITLE == 'acronym'", null, null, null);
+    List<Document> docs = results.getResults();
+    assertEquals(4, docs.size());
+    
+    results = table.runQuery(c, auths, "TEXT == 'abacus'", null, null, null);
+    docs = results.getResults();
+    assertEquals(1, docs.size());
+    for (Document doc : docs) {
+      System.out.println("id: " + doc.getId());
+      for (Field field : doc.getFields())
+        System.out.println(field.getFieldName() + " -> " + field.getFieldValue());
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/1d5c80be/query/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
----------------------------------------------------------------------
diff --git a/query/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java b/query/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
new file mode 100644
index 0000000..cbeefd9
--- /dev/null
+++ b/query/src/test/hadoop2/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.wikisearch.logic;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import junit.framework.Assert;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaIngester;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
+import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator;
+import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
+import org.apache.accumulo.examples.wikisearch.sample.Document;
+import org.apache.accumulo.examples.wikisearch.sample.Field;
+import org.apache.accumulo.examples.wikisearch.sample.Results;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;  
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.Credentials;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestQueryLogic {
+  
+  private static final String METADATA_TABLE_NAME = "wikiMetadata";
+  
+  private static final String TABLE_NAME = "wiki";
+  
+  private static final String INDEX_TABLE_NAME = "wikiIndex";
+  
+  private static final String RINDEX_TABLE_NAME = "wikiReverseIndex";
+  
+  private static final String TABLE_NAMES[] = {METADATA_TABLE_NAME, TABLE_NAME, RINDEX_TABLE_NAME, INDEX_TABLE_NAME};
+  
+  private class MockAccumuloRecordWriter extends RecordWriter<Text,Mutation> {
+    @Override
+    public void write(Text key, Mutation value) throws IOException, InterruptedException {
+      try {
+        writerMap.get(key).addMutation(value);
+      } catch (MutationsRejectedException e) {
+        throw new IOException("Error adding mutation", e);
+      }
+    }
+    
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+      try {
+        for (BatchWriter w : writerMap.values()) {
+          w.flush();
+          w.close();
+        }
+      } catch (MutationsRejectedException e) {
+        throw new IOException("Error closing Batch Writer", e);
+      }
+    }
+    
+  }
+  
+  private Connector c = null;
+  private Configuration conf = new Configuration();
+  private HashMap<Text,BatchWriter> writerMap = new HashMap<Text,BatchWriter>();
+  private QueryLogic table = null;
+  
+  @Before
+  public void setup() throws Exception {
+    
+    Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.DEBUG);
+    Logger.getLogger(QueryLogic.class).setLevel(Level.DEBUG);
+    Logger.getLogger(RangeCalculator.class).setLevel(Level.DEBUG);
+    
+    conf.set(AggregatingRecordReader.START_TOKEN, "<page>");
+    conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
+    conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
+    conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
+    
+    MockInstance i = new MockInstance();
+    c = i.getConnector("root", new PasswordToken(""));
+    WikipediaIngester.createTables(c.tableOperations(), TABLE_NAME, false);
+    for (String table : TABLE_NAMES) {
+      writerMap.put(new Text(table), c.createBatchWriter(table, 1000L, 1000L, 1));
+    }
+    
+    TaskAttemptID id = new TaskAttemptID( "fake", 1, TaskType.MAP, 1, 1);
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, id);
+    
+    RawLocalFileSystem fs = new RawLocalFileSystem();
+    fs.setConf(conf);
+    
+    URL url = ClassLoader.getSystemResource("enwiki-20110901-001.xml");
+    Assert.assertNotNull(url);
+    File data = new File(url.toURI());
+    Path tmpFile = new Path(data.getAbsolutePath());
+    
+    // Setup the Mapper
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null), 0);
+    AggregatingRecordReader rr = new AggregatingRecordReader();
+    Path ocPath = new Path(tmpFile, "oc");
+    OutputCommitter oc = new FileOutputCommitter(ocPath, context);
+    fs.deleteOnExit(ocPath);
+    StandaloneStatusReporter sr = new StandaloneStatusReporter();
+    rr.initialize(split, context);
+    MockAccumuloRecordWriter rw = new MockAccumuloRecordWriter();
+    WikipediaMapper mapper = new WikipediaMapper();
+    
+    // there are times I wonder, "Why do Java people think this is good?" then I drink more whiskey
+    final MapContextImpl<LongWritable,Text,Text,Mutation> mapContext = new MapContextImpl<LongWritable,Text,Text,Mutation>(conf, id, rr, rw, oc, sr, split);
+    // Load data into Mock Accumulo
+    Mapper<LongWritable,Text,Text,Mutation>.Context con = mapper.new Context() {
+      /**
+       * Get the input split for this map.
+       */
+      public InputSplit getInputSplit() {
+        return mapContext.getInputSplit();
+      }
+
+      @Override
+      public LongWritable getCurrentKey() throws IOException, InterruptedException {
+        return mapContext.getCurrentKey();
+      }
+
+      @Override
+      public Text getCurrentValue() throws IOException, InterruptedException {
+        return mapContext.getCurrentValue();
+      }
+
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        return mapContext.nextKeyValue();
+      }
+
+      @Override
+      public Counter getCounter(Enum<?> counterName) {
+        return mapContext.getCounter(counterName);
+      }
+
+      @Override
+      public Counter getCounter(String groupName, String counterName) {
+        return mapContext.getCounter(groupName, counterName);
+      }
+
+      @Override
+      public OutputCommitter getOutputCommitter() {
+        return mapContext.getOutputCommitter();
+      }
+
+      @Override
+      public void write(Text key, Mutation value) throws IOException,
+          InterruptedException {
+        mapContext.write(key, value);
+      }
+
+      @Override
+      public String getStatus() {
+        return mapContext.getStatus();
+      }
+
+      @Override
+      public TaskAttemptID getTaskAttemptID() {
+        return mapContext.getTaskAttemptID();
+      }
+
+      @Override
+      public void setStatus(String msg) {
+        mapContext.setStatus(msg);
+      }
+
+      @Override
+      public Path[] getArchiveClassPaths() {
+        return mapContext.getArchiveClassPaths();
+      }
+
+      @Override
+      public String[] getArchiveTimestamps() {
+        return mapContext.getArchiveTimestamps();
+      }
+
+      @Override
+      public URI[] getCacheArchives() throws IOException {
+        return mapContext.getCacheArchives();
+      }
+
+      @Override
+      public URI[] getCacheFiles() throws IOException {
+        return mapContext.getCacheArchives();
+      }
+
+      @Override
+      public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+          throws ClassNotFoundException {
+        return mapContext.getCombinerClass();
+      }
+
+      @Override
+      public Configuration getConfiguration() {
+        return mapContext.getConfiguration();
+      }
+
+      @Override
+      public Path[] getFileClassPaths() {
+        return mapContext.getFileClassPaths();
+      }
+
+      @Override
+      public String[] getFileTimestamps() {
+        return mapContext.getFileTimestamps();
+      }
+
+      @Override
+      public RawComparator<?> getGroupingComparator() {
+        return mapContext.getGroupingComparator();
+      }
+
+      @Override
+      public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+          throws ClassNotFoundException {
+        return mapContext.getInputFormatClass();
+      }
+
+      @Override
+      public String getJar() {
+        return mapContext.getJar();
+      }
+
+      @Override
+      public JobID getJobID() {
+        return mapContext.getJobID();
+      }
+
+      @Override
+      public String getJobName() {
+        return mapContext.getJobName();
+      }
+
+      /*@Override
+      public boolean userClassesTakesPrecedence() {
+        return mapContext.userClassesTakesPrecedence();
+      }*/
+
+      @Override
+      public boolean getJobSetupCleanupNeeded() {
+        return mapContext.getJobSetupCleanupNeeded();
+      }
+
+      @Override
+      public boolean getTaskCleanupNeeded() {
+        return mapContext.getTaskCleanupNeeded();
+      }
+
+      @Override
+      public Path[] getLocalCacheArchives() throws IOException {
+        return mapContext.getLocalCacheArchives();
+      }
+
+      @Override
+      public Path[] getLocalCacheFiles() throws IOException {
+        return mapContext.getLocalCacheFiles();
+      }
+
+      @Override
+      public Class<?> getMapOutputKeyClass() {
+        return mapContext.getMapOutputKeyClass();
+      }
+
+      @Override
+      public Class<?> getMapOutputValueClass() {
+        return mapContext.getMapOutputValueClass();
+      }
+
+      @Override
+      public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+          throws ClassNotFoundException {
+        return mapContext.getMapperClass();
+      }
+
+      @Override
+      public int getMaxMapAttempts() {
+        return mapContext.getMaxMapAttempts();
+      }
+
+      @Override
+      public int getMaxReduceAttempts() {
+        return mapContext.getMaxReduceAttempts();
+      }
+
+      @Override
+      public int getNumReduceTasks() {
+        return mapContext.getNumReduceTasks();
+      }
+
+      @Override
+      public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+          throws ClassNotFoundException {
+        return mapContext.getOutputFormatClass();
+      }
+
+      @Override
+      public Class<?> getOutputKeyClass() {
+        return mapContext.getOutputKeyClass();
+      }
+
+      @Override
+      public Class<?> getOutputValueClass() {
+        return mapContext.getOutputValueClass();
+      }
+
+      @Override
+      public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+          throws ClassNotFoundException {
+        return mapContext.getPartitionerClass();
+      }
+
+      @Override
+      public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+          throws ClassNotFoundException {
+        return mapContext.getReducerClass();
+      }
+
+      @Override
+      public RawComparator<?> getSortComparator() {
+        return mapContext.getSortComparator();
+      }
+
+      @Override
+      public boolean getSymlink() {
+        return mapContext.getSymlink();
+      }
+
+      @Override
+      public Path getWorkingDirectory() throws IOException {
+        return mapContext.getWorkingDirectory();
+      }
+
+      @Override
+      public void progress() {
+        mapContext.progress();
+      }
+
+      @Override
+      public boolean getProfileEnabled() {
+        return mapContext.getProfileEnabled();
+      }
+
+      @Override
+      public String getProfileParams() {
+        return mapContext.getProfileParams();
+      }
+
+      @Override
+      public IntegerRanges getProfileTaskRange(boolean isMap) {
+        return mapContext.getProfileTaskRange(isMap);
+      }
+
+      @Override
+      public String getUser() {
+        return mapContext.getUser();
+      }
+
+      @Override
+      public Credentials getCredentials() {
+        return mapContext.getCredentials();
+      }
+      
+      @Override
+      public float getProgress() {
+        return mapContext.getProgress();
+      }
+    };
+
+    mapper.run(con);
+    
+    // Flush and close record writers.
+    rw.close(context);
+    
+    table = new QueryLogic();
+    table.setMetadataTableName(METADATA_TABLE_NAME);
+    table.setTableName(TABLE_NAME);
+    table.setIndexTableName(INDEX_TABLE_NAME);
+    table.setReverseIndexTableName(RINDEX_TABLE_NAME);
+    table.setUseReadAheadIterator(false);
+    table.setUnevaluatedFields(Collections.singletonList("TEXT"));
+  }
+  
+  void debugQuery(String tableName) throws Exception {
+    Scanner s = c.createScanner(tableName, new Authorizations("all"));
+    Range r = new Range();
+    s.setRange(r);
+    for (Entry<Key,Value> entry : s)
+      System.out.println(entry.getKey().toString() + " " + entry.getValue().toString());
+  }
+  
+  @Test
+  public void testTitle() throws Exception {
+    Logger.getLogger(AbstractQueryLogic.class).setLevel(Level.OFF);
+    Logger.getLogger(RangeCalculator.class).setLevel(Level.OFF);
+    List<String> auths = new ArrayList<String>();
+    auths.add("enwiki");
+    
+    Results results = table.runQuery(c, auths, "TITLE == 'asphalt' or TITLE == 'abacus' or TITLE == 'acid' or TITLE == 'acronym'", null, null, null);
+    List<Document> docs = results.getResults();
+    assertEquals(4, docs.size());
+    
+    results = table.runQuery(c, auths, "TEXT == 'abacus'", null, null, null);
+    docs = results.getResults();
+    assertEquals(1, docs.size());
+    for (Document doc : docs) {
+      System.out.println("id: " + doc.getId());
+      for (Field field : doc.getFields())
+        System.out.println(field.getFieldName() + " -> " + field.getFieldValue());
+    }
+  }
+  
+}


Mime
View raw message