incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1242133 - in /incubator/accumulo/trunk: ./ src/core/ src/examples/wikisearch/ingest/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ src/server/
Date Wed, 08 Feb 2012 22:08:14 GMT
Author: kturner
Date: Wed Feb  8 22:08:13 2012
New Revision: 1242133

URL: http://svn.apache.org/viewvc?rev=1242133&view=rev
Log:
ACCUMULO-381 merged form 1.4

Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/src/core/   (props changed)
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
    incubator/accumulo/trunk/src/server/   (props changed)

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  8 22:08:13 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873
 /incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1242105
+/incubator/accumulo/branches/1.4:1201902-1242131

Propchange: incubator/accumulo/trunk/src/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  8 22:08:13 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3.5rc/src/core:1209938
 /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
-/incubator/accumulo/branches/1.4/src/core:1201902-1242105
+/incubator/accumulo/branches/1.4/src/core:1201902-1242131

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml?rev=1242133&r1=1242132&r2=1242133&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml (original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml Wed Feb  8 22:08:13 2012
@@ -86,6 +86,10 @@
       <artifactId>libthrift</artifactId>
       <scope>runtime</scope>
     </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1242133&r1=1242132&r2=1242133&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
Wed Feb  8 22:08:13 2012
@@ -67,6 +67,8 @@ public class WikipediaInputFormat extend
 
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
+      // for highly replicated files, returning all of the locations can lead to bunching
+      // TODO replace this with a subset of the locations
       return fileSplit.getLocations();
     }
 

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1242133&r1=1242132&r2=1242133&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
Wed Feb  8 22:08:13 2012
@@ -205,7 +205,7 @@ public class WikipediaMapper extends Map
    * @return
    * @throws IOException
    */
-  private Set<String> getTokens(Article article) throws IOException {
+  static Set<String> getTokens(Article article) throws IOException {
     Set<String> tokenList = new HashSet<String>();
     WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText()));
     TermAttribute term = tok.addAttribute(TermAttribute.class);

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java?rev=1242133&r1=1242132&r2=1242133&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
Wed Feb  8 22:08:13 2012
@@ -21,7 +21,6 @@ package org.apache.accumulo.examples.wik
 
 
 import java.io.IOException;
-import java.io.StringReader;
 import java.nio.charset.Charset;
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -31,17 +30,15 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
-import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
-import org.apache.lucene.analysis.tokenattributes.TermAttribute;
-import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
@@ -66,17 +63,171 @@ public class WikipediaPartitionedMapper 
   private Text reverseIndexTableName = null;
   private Text metadataTableName = null;
   
+  private static class MutationInfo {
+    final String row;
+    final String colfam;
+    final String colqual;
+    final ColumnVisibility cv;
+    final long timestamp;
+    
+    public MutationInfo(String row, String colfam, String colqual, ColumnVisibility cv, long
timestamp) {
+      super();
+      this.row = row;
+      this.colfam = colfam;
+      this.colqual = colqual;
+      this.cv = cv;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      MutationInfo other = (MutationInfo)obj;
+      return (row == other.row || row.equals(other.row)) &&
+          (colfam == other.colfam || colfam.equals(other.colfam)) &&
+          colqual.equals(other.colqual) &&
+          (cv == other.cv || cv.equals(other.cv)) &&
+          timestamp == other.timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+      return row.hashCode() ^ colfam.hashCode() ^ colqual.hashCode() ^ cv.hashCode() ^ (int)timestamp;
+    }
+  }
+  
+  private LRUOutputCombiner<MutationInfo,CountAndSet> wikiIndexOutput;
+  private LRUOutputCombiner<MutationInfo,CountAndSet> wikiReverseIndexOutput;
+  private LRUOutputCombiner<MutationInfo,Value> wikiMetadataOutput;
+  
+  private static class CountAndSet
+  {
+    public int count;
+    public HashSet<String> set;
+    
+    public CountAndSet(String entry)
+    {
+      set = new HashSet<String>();
+      set.add(entry);
+      count = 1;
+    }
+  }
+  
+
   @Override
-  public void setup(Context context) {
+  public void setup(final Context context) {
     Configuration conf = context.getConfiguration();
     tablename = new Text(WikipediaConfiguration.getTableName(conf));
     indexTableName = new Text(tablename + "Index");
     reverseIndexTableName = new Text(tablename + "ReverseIndex");
     metadataTableName = new Text(tablename + "Metadata");
     
+    final Text metadataTableNameFinal = metadataTableName;
+    final Text indexTableNameFinal = indexTableName;
+    final Text reverseIndexTableNameFinal = reverseIndexTableName;
+    
     numPartitions = WikipediaConfiguration.getNumPartitions(conf);
+
+    LRUOutputCombiner.Fold<CountAndSet> indexFold = 
+        new LRUOutputCombiner.Fold<CountAndSet>() {
+      @Override
+      public CountAndSet fold(CountAndSet oldValue, CountAndSet newValue) {
+        oldValue.count += newValue.count;
+        if(oldValue.set == null || newValue.set == null)
+        {
+          oldValue.set = null;
+          return oldValue;
+        }
+        oldValue.set.addAll(newValue.set);
+        if(oldValue.set.size() > GlobalIndexUidCombiner.MAX)
+          oldValue.set = null;
+        return oldValue;
+      }
+    };
+    LRUOutputCombiner.Output<MutationInfo,CountAndSet> indexOutput =
+        new LRUOutputCombiner.Output<WikipediaPartitionedMapper.MutationInfo,CountAndSet>()
{
+      
+      @Override
+      public void output(MutationInfo key, CountAndSet value)
+      {
+          Uid.List.Builder builder = Uid.List.newBuilder();
+          builder.setCOUNT(value.count);
+          if (value.set == null) {
+            builder.setIGNORE(true);
+            builder.clearUID();
+          } else {
+            builder.setIGNORE(false);
+            builder.addAllUID(value.set);
+          }
+          Uid.List list = builder.build();
+          Value val = new Value(list.toByteArray());
+          Mutation m = new Mutation(key.row);
+          m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
+          try {
+            context.write(indexTableNameFinal, m);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+      }
+    };
+    LRUOutputCombiner.Output<MutationInfo,CountAndSet> reverseIndexOutput =
+        new LRUOutputCombiner.Output<WikipediaPartitionedMapper.MutationInfo,CountAndSet>()
{
+      
+      @Override
+      public void output(MutationInfo key, CountAndSet value)
+      {
+          Uid.List.Builder builder = Uid.List.newBuilder();
+          builder.setCOUNT(value.count);
+          if (value.set == null) {
+            builder.setIGNORE(true);
+            builder.clearUID();
+          } else {
+            builder.setIGNORE(false);
+            builder.addAllUID(value.set);
+          }
+          Uid.List list = builder.build();
+          Value val = new Value(list.toByteArray());
+          Mutation m = new Mutation(key.row);
+          m.put(key.colfam, key.colqual, key.cv, key.timestamp, val);
+          try {
+            context.write(reverseIndexTableNameFinal, m);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+      }
+    };
+      
+    wikiIndexOutput = new LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,CountAndSet>(10000,indexFold,indexOutput);
+    wikiReverseIndexOutput = new LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,CountAndSet>(10000,
indexFold,reverseIndexOutput);
+    wikiMetadataOutput = new LRUOutputCombiner<WikipediaPartitionedMapper.MutationInfo,Value>(10000,
+        new LRUOutputCombiner.Fold<Value>() {
+          @Override
+          public Value fold(Value oldValue, Value newValue) {
+            return oldValue;
+          }},
+        new LRUOutputCombiner.Output<MutationInfo,Value>() {
+          @Override
+          public void output(MutationInfo key, Value value) {
+            Mutation m = new Mutation(key.row);
+            m.put(key.colfam, key.colqual, key.cv, key.timestamp, value);
+            try {
+              context.write(metadataTableNameFinal, m);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }});
   }
   
+  
+  
+  @Override
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    wikiIndexOutput.flush();
+    wikiMetadataOutput.flush();
+    wikiReverseIndexOutput.flush();
+  }
+
+
+
   @Override
   protected void map(Text language, Article article, Context context) throws IOException,
InterruptedException {
     String NULL_BYTE = "\u0000";
@@ -93,13 +244,12 @@ public class WikipediaPartitionedMapper 
       for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
         m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(),
cv, article.getTimestamp(), NULL_VALUE);
         // Create mutations for the metadata table.
-        Mutation mm = new Mutation(entry.getKey());
-        mm.put(METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp(),
NULL_VALUE);
-        context.write(metadataTableName, mm);
+        MutationInfo mm = new MutationInfo(entry.getKey(), METADATA_EVENT_COLUMN_FAMILY,
language.toString(), cv, article.getTimestamp());
+        wikiMetadataOutput.put(mm, NULL_VALUE);
       }
       
       // Tokenize the content
-      Set<String> tokens = getTokens(article);
+      Set<String> tokens = WikipediaMapper.getTokens(article);
       
       // We are going to put the fields to be indexed into a multimap. This allows us to
iterate
       // over the entire set once.
@@ -118,30 +268,17 @@ public class WikipediaPartitionedMapper 
         m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(),
cv, article.getTimestamp(), NULL_VALUE);
         
         // Create mutations for the global index
-        // Create a UID object for the Value
-        Builder uidBuilder = Uid.List.newBuilder();
-        uidBuilder.setIGNORE(false);
-        uidBuilder.setCOUNT(1);
-        uidBuilder.addUID(Integer.toString(article.getId()));
-        Uid.List uidList = uidBuilder.build();
-        Value val = new Value(uidList.toByteArray());
-        
-        // Create mutations for the global index
         // Row is field value, colf is field name, colq is partitionid\0language, value is
Uid.List object
-        Mutation gm = new Mutation(index.getValue());
-        gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(),
val);
-        context.write(indexTableName, gm);
+        MutationInfo gm = new MutationInfo(index.getValue(),index.getKey(),partitionId +
NULL_BYTE + language, cv, article.getTimestamp());
+        wikiIndexOutput.put(gm, new CountAndSet(Integer.toString(article.getId())));
         
         // Create mutations for the global reverse index
-        Mutation grm = new Mutation(StringUtils.reverse(index.getValue()));
-        grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(),
val);
-        context.write(reverseIndexTableName, grm);
+        MutationInfo grm = new MutationInfo(StringUtils.reverse(index.getValue()),index.getKey(),partitionId
+ NULL_BYTE + language, cv, article.getTimestamp());
+        wikiReverseIndexOutput.put(grm, new CountAndSet(Integer.toString(article.getId())));
         
         // Create mutations for the metadata table.
-        Mutation mm = new Mutation(index.getKey());
-        mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(),
cv, article.getTimestamp(), NULL_VALUE);
-        context.write(metadataTableName, mm);
-        
+        MutationInfo mm = new MutationInfo(index.getKey(),METADATA_INDEX_COLUMN_FAMILY, language
+ NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp());
+        wikiMetadataOutput.put(mm, NULL_VALUE);
       }
       // Add the entire text to the document section of the table.
       // row is the partition, colf is 'd', colq is language\0articleid, value is Base64
encoded GZIP'd document
@@ -153,40 +290,4 @@ public class WikipediaPartitionedMapper 
     }
     context.progress();
   }
-  
-  /**
-   * Tokenize the wikipedia content
-   * 
-   * @param article
-   * @return
-   * @throws IOException
-   */
-  private Set<String> getTokens(Article article) throws IOException {
-    Set<String> tokenList = new HashSet<String>();
-    WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText()));
-    TermAttribute term = tok.addAttribute(TermAttribute.class);
-    try {
-      while (tok.incrementToken()) {
-        String token = term.term();
-        if (!StringUtils.isEmpty(token))
-          tokenList.add(token);
-      }
-    } catch (IOException e) {
-      log.error("Error tokenizing text", e);
-    } finally {
-      try {
-        tok.end();
-      } catch (IOException e) {
-        log.error("Error calling end()", e);
-      } finally {
-        try {
-          tok.close();
-        } catch (IOException e) {
-          log.error("Error closing tokenizer", e);
-        }
-      }
-    }
-    return tokenList;
-  }
-  
 }

Propchange: incubator/accumulo/trunk/src/server/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb  8 22:08:13 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3.5rc/src/server:1209938
 /incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
-/incubator/accumulo/branches/1.4/src/server:1201902-1242105
+/incubator/accumulo/branches/1.4/src/server:1201902-1242131



Mime
View raw message