Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C9B809E52 for ; Wed, 8 Feb 2012 22:08:40 +0000 (UTC) Received: (qmail 65112 invoked by uid 500); 8 Feb 2012 22:08:40 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 65075 invoked by uid 500); 8 Feb 2012 22:08:40 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 65068 invoked by uid 99); 8 Feb 2012 22:08:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2012 22:08:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2012 22:08:35 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6D69C23888E7; Wed, 8 Feb 2012 22:08:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1242133 - in /incubator/accumulo/trunk: ./ src/core/ src/examples/wikisearch/ingest/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ src/server/ Date: Wed, 08 Feb 2012 22:08:14 -0000 To: accumulo-commits@incubator.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120208220814.6D69C23888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Wed Feb 8 22:08:13 2012 New Revision: 1242133 URL: http://svn.apache.org/viewvc?rev=1242133&view=rev Log: ACCUMULO-381 merged form 1.4 Modified: incubator/accumulo/trunk/ (props changed) incubator/accumulo/trunk/src/core/ (props changed) incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java incubator/accumulo/trunk/src/server/ (props changed) Propchange: incubator/accumulo/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Feb 8 22:08:13 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873 /incubator/accumulo/branches/1.3.5rc:1209938 -/incubator/accumulo/branches/1.4:1201902-1242105 +/incubator/accumulo/branches/1.4:1201902-1242131 Propchange: incubator/accumulo/trunk/src/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Feb 8 22:08:13 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3.5rc/src/core:1209938 /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 -/incubator/accumulo/branches/1.4/src/core:1201902-1242105 +/incubator/accumulo/branches/1.4/src/core:1201902-1242131 Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml?rev=1242133&r1=1242132&r2=1242133&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/pom.xml Wed Feb 8 22:08:13 2012 @@ -86,6 +86,10 @@ libthrift runtime + + commons-codec + commons-codec + Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1242133&r1=1242132&r2=1242133&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java Wed Feb 8 22:08:13 2012 @@ -67,6 +67,8 @@ public class WikipediaInputFormat extend @Override public String[] getLocations() throws IOException, InterruptedException { + // for highly replicated files, returning all of the locations can lead to bunching + // TODO replace this with a subset of the locations return fileSplit.getLocations(); } Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1242133&r1=1242132&r2=1242133&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java Wed Feb 8 22:08:13 2012 @@ -205,7 +205,7 @@ public class WikipediaMapper extends Map * @return * @throws IOException */ - private Set getTokens(Article article) throws IOException { + static Set getTokens(Article article) throws IOException { Set tokenList = new HashSet(); WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); TermAttribute term = tok.addAttribute(TermAttribute.class); Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java?rev=1242133&r1=1242132&r2=1242133&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java Wed Feb 8 22:08:13 2012 @@ -21,7 +21,6 @@ package org.apache.accumulo.examples.wik import java.io.IOException; -import java.io.StringReader; import java.nio.charset.Charset; import java.util.HashSet; import java.util.Map.Entry; @@ -31,17 +30,15 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.protobuf.Uid; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; -import org.apache.lucene.analysis.tokenattributes.TermAttribute; -import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -66,17 +63,171 @@ public class WikipediaPartitionedMapper private Text reverseIndexTableName = null; private Text metadataTableName = null; + private static class MutationInfo { + final String row; + final String colfam; + final String colqual; + final ColumnVisibility cv; + final long timestamp; + + public MutationInfo(String row, String colfam, String colqual, ColumnVisibility cv, long timestamp) { + super(); + this.row = row; + this.colfam = colfam; + this.colqual = colqual; + this.cv = cv; + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object obj) { + MutationInfo other = (MutationInfo)obj; + return (row == other.row || row.equals(other.row)) && + (colfam == other.colfam || colfam.equals(other.colfam)) && + colqual.equals(other.colqual) && + (cv == other.cv || cv.equals(other.cv)) && + timestamp == other.timestamp; + } + + @Override + public int hashCode() { + return row.hashCode() ^ colfam.hashCode() ^ colqual.hashCode() ^ cv.hashCode() ^ (int)timestamp; + } + } + + private LRUOutputCombiner wikiIndexOutput; + private LRUOutputCombiner wikiReverseIndexOutput; + private LRUOutputCombiner wikiMetadataOutput; + + private static class CountAndSet + { + public int count; + public HashSet set; + + public CountAndSet(String entry) + { + set = new HashSet(); + set.add(entry); + count = 1; + } + } + + @Override - public void setup(Context context) { + public void setup(final Context context) { Configuration conf = context.getConfiguration(); tablename = new Text(WikipediaConfiguration.getTableName(conf)); indexTableName = new Text(tablename + "Index"); reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); + final Text metadataTableNameFinal = metadataTableName; + final Text indexTableNameFinal = indexTableName; + final Text reverseIndexTableNameFinal = reverseIndexTableName; + numPartitions = WikipediaConfiguration.getNumPartitions(conf); + + LRUOutputCombiner.Fold indexFold = + new LRUOutputCombiner.Fold() { + @Override + public CountAndSet fold(CountAndSet oldValue, CountAndSet newValue) { + oldValue.count += newValue.count; + if(oldValue.set == null || newValue.set == null) + { + oldValue.set = null; + return oldValue; + } + oldValue.set.addAll(newValue.set); + if(oldValue.set.size() > GlobalIndexUidCombiner.MAX) + oldValue.set = null; + return oldValue; + } + }; + LRUOutputCombiner.Output indexOutput = + new LRUOutputCombiner.Output() { + + @Override + public void output(MutationInfo key, CountAndSet value) + { + Uid.List.Builder builder = Uid.List.newBuilder(); + builder.setCOUNT(value.count); + if (value.set == null) { + builder.setIGNORE(true); + builder.clearUID(); + } else { + builder.setIGNORE(false); + builder.addAllUID(value.set); + } + Uid.List list = builder.build(); + Value val = new Value(list.toByteArray()); + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); + try { + context.write(indexTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + LRUOutputCombiner.Output reverseIndexOutput = + new LRUOutputCombiner.Output() { + + @Override + public void output(MutationInfo key, CountAndSet value) + { + Uid.List.Builder builder = Uid.List.newBuilder(); + builder.setCOUNT(value.count); + if (value.set == null) { + builder.setIGNORE(true); + builder.clearUID(); + } else { + builder.setIGNORE(false); + builder.addAllUID(value.set); + } + Uid.List list = builder.build(); + Value val = new Value(list.toByteArray()); + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); + try { + context.write(reverseIndexTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + wikiIndexOutput = new LRUOutputCombiner(10000,indexFold,indexOutput); + wikiReverseIndexOutput = new LRUOutputCombiner(10000, indexFold,reverseIndexOutput); + wikiMetadataOutput = new LRUOutputCombiner(10000, + new LRUOutputCombiner.Fold() { + @Override + public Value fold(Value oldValue, Value newValue) { + return oldValue; + }}, + new LRUOutputCombiner.Output() { + @Override + public void output(MutationInfo key, Value value) { + Mutation m = new Mutation(key.row); + m.put(key.colfam, key.colqual, key.cv, key.timestamp, value); + try { + context.write(metadataTableNameFinal, m); + } catch (Exception e) { + throw new RuntimeException(e); + } + }}); } + + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + wikiIndexOutput.flush(); + wikiMetadataOutput.flush(); + wikiReverseIndexOutput.flush(); + } + + + @Override protected void map(Text language, Article article, Context context) throws IOException, InterruptedException { String NULL_BYTE = "\u0000"; @@ -93,13 +244,12 @@ public class WikipediaPartitionedMapper for (Entry entry : article.getFieldValues().entrySet()) { m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE); // Create mutations for the metadata table. - Mutation mm = new Mutation(entry.getKey()); - mm.put(METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); + MutationInfo mm = new MutationInfo(entry.getKey(), METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp()); + wikiMetadataOutput.put(mm, NULL_VALUE); } // Tokenize the content - Set tokens = getTokens(article); + Set tokens = WikipediaMapper.getTokens(article); // We are going to put the fields to be indexed into a multimap. This allows us to iterate // over the entire set once. @@ -118,30 +268,17 @@ public class WikipediaPartitionedMapper m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE); // Create mutations for the global index - // Create a UID object for the Value - Builder uidBuilder = Uid.List.newBuilder(); - uidBuilder.setIGNORE(false); - uidBuilder.setCOUNT(1); - uidBuilder.addUID(Integer.toString(article.getId())); - Uid.List uidList = uidBuilder.build(); - Value val = new Value(uidList.toByteArray()); - - // Create mutations for the global index // Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object - Mutation gm = new Mutation(index.getValue()); - gm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); - context.write(indexTableName, gm); + MutationInfo gm = new MutationInfo(index.getValue(),index.getKey(),partitionId + NULL_BYTE + language, cv, article.getTimestamp()); + wikiIndexOutput.put(gm, new CountAndSet(Integer.toString(article.getId()))); // Create mutations for the global reverse index - Mutation grm = new Mutation(StringUtils.reverse(index.getValue())); - grm.put(index.getKey(), partitionId + NULL_BYTE + language, cv, article.getTimestamp(), val); - context.write(reverseIndexTableName, grm); + MutationInfo grm = new MutationInfo(StringUtils.reverse(index.getValue()),index.getKey(),partitionId + NULL_BYTE + language, cv, article.getTimestamp()); + wikiReverseIndexOutput.put(grm, new CountAndSet(Integer.toString(article.getId()))); // Create mutations for the metadata table. - Mutation mm = new Mutation(index.getKey()); - mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); - + MutationInfo mm = new MutationInfo(index.getKey(),METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp()); + wikiMetadataOutput.put(mm, NULL_VALUE); } // Add the entire text to the document section of the table. // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document @@ -153,40 +290,4 @@ public class WikipediaPartitionedMapper } context.progress(); } - - /** - * Tokenize the wikipedia content - * - * @param article - * @return - * @throws IOException - */ - private Set getTokens(Article article) throws IOException { - Set tokenList = new HashSet(); - WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); - TermAttribute term = tok.addAttribute(TermAttribute.class); - try { - while (tok.incrementToken()) { - String token = term.term(); - if (!StringUtils.isEmpty(token)) - tokenList.add(token); - } - } catch (IOException e) { - log.error("Error tokenizing text", e); - } finally { - try { - tok.end(); - } catch (IOException e) { - log.error("Error calling end()", e); - } finally { - try { - tok.close(); - } catch (IOException e) { - log.error("Error closing tokenizer", e); - } - } - } - return tokenList; - } - } Propchange: incubator/accumulo/trunk/src/server/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Feb 8 22:08:13 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3.5rc/src/server:1209938 /incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 -/incubator/accumulo/branches/1.4/src/server:1201902-1242105 +/incubator/accumulo/branches/1.4/src/server:1201902-1242131