Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3ABD5107EE for ; Wed, 19 Mar 2014 16:09:20 +0000 (UTC) Received: (qmail 404 invoked by uid 500); 19 Mar 2014 16:08:28 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 99814 invoked by uid 500); 19 Mar 2014 16:08:18 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 99576 invoked by uid 99); 19 Mar 2014 16:08:12 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Mar 2014 16:08:12 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 062BD98461E; Wed, 19 Mar 2014 16:08:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ujustgotbilld@apache.org To: commits@accumulo.apache.org Date: Wed, 19 Mar 2014 16:08:14 -0000 Message-Id: <609a48b47a6c4754a29f5a9982386902@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/50] [abbrv] git commit: ACCUMULO-375 added an LRUOutputCombiner to combine and reduce the number of mutations sent to Accumulo ACCUMULO-375 added an LRUOutputCombiner to combine and reduce the number of mutations sent to Accumulo git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1242117 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/842696ee Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/842696ee Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/842696ee Branch: refs/heads/master Commit: 842696eebe9a10dc63731c99160d1e1ef23e7787 Parents: fa35931 Author: Adam Fuchs Authored: Wed Feb 8 21:43:35 2012 +0000 Committer: Adam Fuchs Committed: Wed Feb 8 21:43:35 2012 +0000 ---------------------------------------------------------------------- .../wikisearch/ingest/LRUOutputCombiner.java | 61 +++++ .../wikisearch/ingest/WikipediaInputFormat.java | 2 + .../wikisearch/ingest/WikipediaMapper.java | 2 +- .../ingest/WikipediaPartitionedMapper.java | 229 +++++++++++++------ 4 files changed, 229 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java new file mode 100644 index 0000000..e641f36 --- /dev/null +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/LRUOutputCombiner.java @@ -0,0 +1,61 @@ +package org.apache.accumulo.examples.wikisearch.ingest; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRUOutputCombiner extends LinkedHashMap { + + private static final long serialVersionUID = 1L; + + public static abstract class Fold + { + public abstract Value fold(Value oldValue, Value newValue); + } + + public static abstract class Output + { + public abstract void output(Key key, Value value); + } + + private final int capacity; + private final Fold fold; + private final Output output; + + private long cacheHits = 0; + private long cacheMisses = 0; + + public LRUOutputCombiner(int capacity, Fold fold, Output output) { + super(capacity + 1, 1.1f, true); + this.capacity = capacity; + this.fold = fold; + this.output = output; + } + + protected boolean removeEldestEntry(Map.Entry eldest) { + if (size() > capacity) { + output.output(eldest.getKey(), eldest.getValue()); + return true; + } + return false; + } + + @Override + public Value put(Key key, Value value) { + Value val = get(key); + if (val != null) { + value = fold.fold(val, value); + cacheHits++; + } else { + cacheMisses++; + } + super.put(key, value); + return null; + } + + public void flush() { + for (Map.Entry e : entrySet()) { + output.output(e.getKey(), e.getValue()); + } + clear(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java index 731d02c..e682f2f 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java @@ -67,6 +67,8 @@ public class WikipediaInputFormat extends TextInputFormat { @Override public String[] getLocations() throws IOException, InterruptedException { + // for highly replicated files, returning all of the locations can lead to bunching + // TODO replace this with a subset of the locations return fileSplit.getLocations(); } http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java index 1ec531b..a06c57f 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java @@ -205,7 +205,7 @@ public class WikipediaMapper extends Mapper { * @return * @throws IOException */ - private Set getTokens(Article article) throws IOException { + static Set getTokens(Article article) throws IOException { Set tokenList = new HashSet(); WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); TermAttribute term = tok.addAttribute(TermAttribute.class); http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/842696ee/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java ---------------------------------------------------------------------- diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java index 4d94c24..25bf572 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java @@ -21,7 +21,6 @@ package org.apache.accumulo.examples.wikisearch.ingest; import java.io.IOException; -import java.io.StringReader; import java.nio.charset.Charset; import java.util.HashSet; import java.util.Map.Entry; @@ -31,17 +30,15 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.protobuf.Uid; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; -import org.apache.lucene.analysis.tokenattributes.TermAttribute; -import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -66,17 +63,171 @@ public class WikipediaPartitionedMapper extends Mapper wikiIndexOutput; + private LRUOutputCombiner wikiReverseIndexOutput; + private LRUOutputCombiner wikiMetadataOutput; + + private static class CountAndSet + { + public int count; + public HashSet set; + + public CountAndSet(String entry) + { + set = new HashSet(); + set.add(entry); + count = 1; + } + } + + @Override - public void setup(Context context) { + public void setup(final Context context) { Configuration conf = context.getConfiguration(); tablename = new Text(WikipediaConfiguration.getTableName(conf)); indexTableName = new Text(tablename + "Index"); reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); + final Text metadataTableNameFinal = metadataTableName; + final Text indexTableNameFinal = indexTableName; + final Text reverseIndexTableNameFinal = reverseIndexTableName; + numPartitions = WikipediaConfiguration.getNumPartitions(conf); + + LRUOutputCombiner.Fold indexFold = + new LRUOutputCombiner.Fold() { + @Override + public CountAndSet fold(CountAndSet oldValue, CountAndSet newValue) { + oldValue.count += newValue.count; + if(oldValue.set == null || newValue.set == null) + { + oldValue.set = null; + return oldValue; + } + oldValue.set.addAll(newValue.set); + if(oldValue.set.size() > GlobalIndexUidCombiner.MAX) + oldValue.set = null; + return oldValue; + } + }; + LRUOutputCombiner.Output indexOutput = + new LRUOutputCombiner.Output() { + + @Override + public void output(MutationInfo key, CountAndSet value) + { + Uid.List.Builder builder = Uid.List.newBuilder(); + builder.setCOUNT(value.count); + if (value.set == null) { + builder.setIGNORE(true); + builder.clearUID(); + } else { + builder.setIGNORE(false); + builder.addAllUID(value.set); + } + Uid.List list = builder.build(); + Value val = new Value(list.toByteArray()); + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); + try { + context.write(indexTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + LRUOutputCombiner.Output reverseIndexOutput = + new LRUOutputCombiner.Output() { + + @Override + public void output(MutationInfo key, CountAndSet value) + { + Uid.List.Builder builder = Uid.List.newBuilder(); + builder.setCOUNT(value.count); + if (value.set == null) { + builder.setIGNORE(true); + builder.clearUID(); + } else { + builder.setIGNORE(false); + builder.addAllUID(value.set); + } + Uid.List list = builder.build(); + Value val = new Value(list.toByteArray()); + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); + try { + context.write(reverseIndexTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + wikiIndexOutput = new LRUOutputCombiner(10000,indexFold,indexOutput); + wikiReverseIndexOutput = new LRUOutputCombiner(10000, indexFold,reverseIndexOutput); + wikiMetadataOutput = new LRUOutputCombiner(10000, + new LRUOutputCombiner.Fold() { + @Override + public Value fold(Value oldValue, Value newValue) { + return oldValue; + }}, + new LRUOutputCombiner.Output() { + @Override + public void output(MutationInfo key, Value value) { + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, value); + try { + context.write(metadataTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + }}); } + + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + wikiIndexOutput.flush(); + wikiMetadataOutput.flush(); + wikiReverseIndexOutput.flush(); + } + + + @Override protected void map(Text language, Article article, Context context) throws IOException, InterruptedException { String NULL_BYTE = "\u0000"; @@ -93,13 +244,12 @@ public class WikipediaPartitionedMapper extends Mapper entry : article.getFieldValues().entrySet()) { m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE); // Create mutations for the metadata table. - Mutation mm = new Mutation(entry.getKey()); - mm.put(METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); + MutationInfo mm = new MutationInfo(entry.getKey(), METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp()); + wikiMetadataOutput.put(mm, NULL_VALUE); } // Tokenize the content - Set tokens = getTokens(article); + Set tokens = WikipediaMapper.getTokens(article); // We are going to put the fields to be indexed into a multimap. This allows us to iterate // over the entire set once. @@ -118,30 +268,17 @@ public class WikipediaPartitionedMapper extends Mapper getTokens(Article article) throws IOException { - Set tokenList = new HashSet(); - WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); - TermAttribute term = tok.addAttribute(TermAttribute.class); - try { - while (tok.incrementToken()) { - String token = term.term(); - if (!StringUtils.isEmpty(token)) - tokenList.add(token); - } - } catch (IOException e) { - log.error("Error tokenizing text", e); - } finally { - try { - tok.end(); - } catch (IOException e) { - log.error("Error calling end()", e); - } finally { - try { - tok.close(); - } catch (IOException e) { - log.error("Error closing tokenizer", e); - } - } - } - return tokenList; - } - }