incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From afu...@apache.org
Subject svn commit: r1241206 - in /incubator/accumulo/branches/1.4/src/examples/wikisearch: ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/ ingest/src/test/java/org/apach...
Date Mon, 06 Feb 2012 22:01:49 GMT
Author: afuchs
Date: Mon Feb  6 22:01:48 2012
New Revision: 1241206

URL: http://svn.apache.org/viewvc?rev=1241206&view=rev
Log:
ACCUMULO-375

Modified:
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1241206&r1=1241205&r2=1241206&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
Mon Feb  6 22:01:48 2012
@@ -45,6 +45,9 @@ public class WikipediaConfiguration {
   public final static String ANALYZER = "wikipedia.index.analyzer";
   
   public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+
+  public final static String NUM_GROUPS = "wikipedia.ingest.groups";
+
   
   public static String getUser(Configuration conf) {
     return conf.get(USER);
@@ -110,6 +113,10 @@ public class WikipediaConfiguration {
     return conf.getInt(NUM_PARTITIONS, 25);
   }
   
+  public static int getNumGroups(Configuration conf) {
+    return conf.getInt(NUM_GROUPS, 1);
+  }
+  
   /**
    * Helper method to get properties from Hadoop configuration
    * 

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1241206&r1=1241205&r2=1241206&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
Mon Feb  6 22:01:48 2012
@@ -16,20 +16,107 @@
  */
 package org.apache.accumulo.examples.wikisearch.ingest;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
 
 public class WikipediaInputFormat extends TextInputFormat {
+
+  public static class WikipediaInputSplit extends InputSplit implements Writable {
+
+    public WikipediaInputSplit(){}
+    
+    public WikipediaInputSplit(FileSplit fileSplit, int partition)
+    {
+      this.fileSplit = fileSplit;
+      this.partition = partition;
+    }
+    
+    private FileSplit fileSplit = null;
+    private int partition = -1;
+
+    public int getPartition()
+    {
+      return partition;
+    }
+    
+    public FileSplit getFileSplit()
+    {
+      return fileSplit;
+    }
+    
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return fileSplit.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return fileSplit.getLocations();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      Path file = new Path(in.readUTF());
+      long start = in.readLong();
+      long length = in.readLong();
+      int numHosts = in.readInt();
+      String[] hosts = new String[numHosts];
+      for(int i = 0; i < numHosts; i++)
+        hosts[i] = in.readUTF();
+      fileSplit = new FileSplit(file, start, length, hosts);
+      partition = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(fileSplit.getPath().toString());
+      out.writeLong(fileSplit.getStart());
+      out.writeLong(fileSplit.getLength());
+      String [] hosts = fileSplit.getLocations();
+      out.writeInt(hosts.length);
+      for(String host:hosts)
+        out.writeUTF(host);
+      fileSplit.write(out);
+      out.writeInt(partition);
+    }
+    
+  }
   
   @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    List<InputSplit> superSplits = super.getSplits(job);
+    List<WikipediaInputSplit> splits = new ArrayList<WikipediaInputSplit>();
+    
+    int numGroups = WikipediaConfiguration.getNumGroups(job.getConfiguration());
+
+    for(InputSplit split:superSplits)
+    {
+      FileSplit fileSplit = (FileSplit)split;
+      for(int group = 0; group < numGroups; group++)
+      {
+        splits.add(new WikipediaInputSplit(fileSplit,group));
+      }
+    }
+    return super.getSplits(job);
+  }
+
+  @Override
   public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext
context) {
     return new AggregatingRecordReader();
   }

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1241206&r1=1241205&r2=1241206&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
Mon Feb  6 22:01:48 2012
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
@@ -71,6 +72,9 @@ public class WikipediaMapper extends Map
   private String language;
   private int numPartitions = 0;
   private ColumnVisibility cv = null;
+
+  private int myGroup = -1;
+  private int numGroups = -1;
   
   private Text tablename = null;
   private Text indexTableName = null;
@@ -85,7 +89,11 @@ public class WikipediaMapper extends Map
     reverseIndexTableName = new Text(tablename + "ReverseIndex");
     metadataTableName = new Text(tablename + "Metadata");
     
-    FileSplit split = (FileSplit) context.getInputSplit();
+    WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit();
+    myGroup = wiSplit.getPartition();
+    numGroups = WikipediaConfiguration.getNumGroups(conf);
+    
+    FileSplit split = wiSplit.getFileSplit();
     String fileName = split.getPath().getName();
     Matcher matcher = languagePattern.matcher(fileName);
     if (matcher.matches()) {
@@ -118,6 +126,9 @@ public class WikipediaMapper extends Map
     String colfPrefix = language + NULL_BYTE;
     String indexPrefix = "fi" + NULL_BYTE;
     if (article != null) {
+      int groupId = WikipediaMapper.getPartitionId(article, numGroups);
+      if(groupId != myGroup)
+        return;
       Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article,
numPartitions)));
       
       // Create the mutations for the document.

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java?rev=1241206&r1=1241205&r2=1241206&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
Mon Feb  6 22:01:48 2012
@@ -20,6 +20,7 @@ package org.apache.accumulo.examples.wik
 import java.io.IOException;
 
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.util.TextUtil;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -60,7 +61,7 @@ public class AggregatingRecordReader ext
   
   @Override
   public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException
{
-    super.initialize(genericSplit, context);
+    super.initialize(((WikipediaInputSplit)genericSplit).getFileSplit(), context);
     this.startToken = WikipediaConfiguration.isNull(context.getConfiguration(), START_TOKEN,
String.class);
     this.endToken = WikipediaConfiguration.isNull(context.getConfiguration(), END_TOKEN,
String.class);
     this.returnPartialMatches = context.getConfiguration().getBoolean(RETURN_PARTIAL_MATCHES,
false);

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java?rev=1241206&r1=1241205&r2=1241206&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
Mon Feb  6 22:01:48 2012
@@ -100,6 +100,7 @@ public class WikipediaMapperTest {
     conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
     conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
     conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
     
     MockInstance i = new MockInstance();
     c = i.getConnector("root", "pass");

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java?rev=1241206&r1=1241205&r2=1241206&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
Mon Feb  6 22:01:48 2012
@@ -28,6 +28,7 @@ import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathExpression;
 import javax.xml.xpath.XPathFactory;
 
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -140,7 +141,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     AggregatingRecordReader reader = new AggregatingRecordReader();
     try {
       // Clear the values for BEGIN and STOP TOKEN
@@ -162,7 +163,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -183,7 +184,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -201,7 +202,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -219,7 +220,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -244,7 +245,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -263,7 +264,7 @@ public class AggregatingRecordReaderTest
     File f = createFile(xml5);
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java?rev=1241206&r1=1241205&r2=1241206&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
Mon Feb  6 22:01:48 2012
@@ -37,6 +37,8 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
 import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
@@ -113,6 +115,7 @@ public class TestQueryLogic {
     conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
     conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
     conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
     
     MockInstance i = new MockInstance();
     c = i.getConnector("root", "pass");
@@ -136,7 +139,7 @@ public class TestQueryLogic {
     Path tmpFile = new Path(data.getAbsolutePath());
     
     // Setup the Mapper
-    InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(),
null),0);
     AggregatingRecordReader rr = new AggregatingRecordReader();
     Path ocPath = new Path(tmpFile, "oc");
     OutputCommitter oc = new FileOutputCommitter(ocPath, context);



Mime
View raw message