Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ED4319F9A for ; Mon, 6 Feb 2012 22:53:55 +0000 (UTC) Received: (qmail 17379 invoked by uid 500); 6 Feb 2012 22:53:53 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 17070 invoked by uid 500); 6 Feb 2012 22:53:52 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 16883 invoked by uid 99); 6 Feb 2012 22:53:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2012 22:53:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,LOTS_OF_MONEY X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2012 22:53:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6BE322388860; Mon, 6 Feb 2012 22:53:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1241244 - in /incubator/accumulo/trunk: ./ src/core/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/ ... Date: Mon, 06 Feb 2012 22:53:29 -0000 To: accumulo-commits@incubator.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120206225330.6BE322388860@eris.apache.org> Author: kturner Date: Mon Feb 6 22:53:29 2012 New Revision: 1241244 URL: http://svn.apache.org/viewvc?rev=1241244&view=rev Log: ACCUMULO-373 ACCUMULO-374 ACCUMULO-375 merged from 1.4 Modified: incubator/accumulo/trunk/ (props changed) incubator/accumulo/trunk/src/core/ (props changed) incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java incubator/accumulo/trunk/src/server/ (props changed) incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java Propchange: incubator/accumulo/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Feb 6 22:53:29 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873 /incubator/accumulo/branches/1.3.5rc:1209938 -/incubator/accumulo/branches/1.4:1201902-1241123 +/incubator/accumulo/branches/1.4:1201902-1241241 Propchange: incubator/accumulo/trunk/src/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Feb 6 22:53:29 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3.5rc/src/core:1209938 /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 -/incubator/accumulo/branches/1.4/src/core:1201902-1241123 +/incubator/accumulo/branches/1.4/src/core:1201902-1241241 Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java Mon Feb 6 22:53:29 2012 @@ -45,6 +45,9 @@ public class WikipediaConfiguration { public final static String ANALYZER = "wikipedia.index.analyzer"; public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions"; + + public final static String NUM_GROUPS = "wikipedia.ingest.groups"; + public static String getUser(Configuration conf) { return conf.get(USER); @@ -110,6 +113,10 @@ public class WikipediaConfiguration { return conf.getInt(NUM_PARTITIONS, 25); } + public static int getNumGroups(Configuration conf) { + return conf.getInt(NUM_GROUPS, 1); + } + /** * Helper method to get properties from Hadoop configuration * Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java Mon Feb 6 22:53:29 2012 @@ -16,20 +16,107 @@ */ package org.apache.accumulo.examples.wikisearch.ingest; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class WikipediaInputFormat extends TextInputFormat { + + public static class WikipediaInputSplit extends InputSplit implements Writable { + + public WikipediaInputSplit(){} + + public WikipediaInputSplit(FileSplit fileSplit, int partition) + { + this.fileSplit = fileSplit; + this.partition = partition; + } + + private FileSplit fileSplit = null; + private int partition = -1; + + public int getPartition() + { + return partition; + } + + public FileSplit getFileSplit() + { + return fileSplit; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return fileSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return fileSplit.getLocations(); + } + + @Override + public void readFields(DataInput in) throws IOException { + Path file = new Path(in.readUTF()); + long start = in.readLong(); + long length = in.readLong(); + int numHosts = in.readInt(); + String[] hosts = new String[numHosts]; + for(int i = 0; i < numHosts; i++) + hosts[i] = in.readUTF(); + fileSplit = new FileSplit(file, start, length, hosts); + partition = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(fileSplit.getPath().toString()); + out.writeLong(fileSplit.getStart()); + out.writeLong(fileSplit.getLength()); + String [] hosts = fileSplit.getLocations(); + out.writeInt(hosts.length); + for(String host:hosts) + out.writeUTF(host); + fileSplit.write(out); + out.writeInt(partition); + } + + } @Override + public List getSplits(JobContext job) throws IOException { + List superSplits = super.getSplits(job); + List splits = new ArrayList(); + + int numGroups = WikipediaConfiguration.getNumGroups(job.getConfiguration()); + + for(InputSplit split:superSplits) + { + FileSplit fileSplit = (FileSplit)split; + for(int group = 0; group < numGroups; group++) + { + splits.add(new WikipediaInputSplit(fileSplit,group)); + } + } + return splits; + } + + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new AggregatingRecordReader(); } Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java Mon Feb 6 22:53:29 2012 @@ -32,11 +32,11 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.protobuf.Uid; import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; @@ -48,20 +48,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.log4j.Logger; -import org.apache.lucene.analysis.StopAnalyzer; -import org.apache.lucene.analysis.StopFilter; -import org.apache.lucene.analysis.ar.ArabicAnalyzer; -import org.apache.lucene.analysis.br.BrazilianAnalyzer; -import org.apache.lucene.analysis.cjk.CJKAnalyzer; -import org.apache.lucene.analysis.de.GermanAnalyzer; -import org.apache.lucene.analysis.el.GreekAnalyzer; -import org.apache.lucene.analysis.fa.PersianAnalyzer; -import org.apache.lucene.analysis.fr.FrenchAnalyzer; -import org.apache.lucene.analysis.nl.DutchAnalyzer; import org.apache.lucene.analysis.tokenattributes.TermAttribute; import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; - import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -82,8 +71,10 @@ public class WikipediaMapper extends Map private ArticleExtractor extractor; private String language; private int numPartitions = 0; - private Set stopwords = null; private ColumnVisibility cv = null; + + private int myGroup = -1; + private int numGroups = -1; private Text tablename = null; private Text indexTableName = null; @@ -98,30 +89,15 @@ public class WikipediaMapper extends Map reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); - FileSplit split = (FileSplit) context.getInputSplit(); + WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit(); + myGroup = wiSplit.getPartition(); + numGroups = WikipediaConfiguration.getNumGroups(conf); + + FileSplit split = wiSplit.getFileSplit(); String fileName = split.getPath().getName(); Matcher matcher = languagePattern.matcher(fileName); if (matcher.matches()) { language = matcher.group(1).replace('_', '-').toLowerCase(); - if (language.equals("arwiki")) - stopwords = ArabicAnalyzer.getDefaultStopSet(); - else if (language.equals("brwiki")) - stopwords = BrazilianAnalyzer.getDefaultStopSet(); - else if (language.startsWith("zh")) - stopwords = CJKAnalyzer.getDefaultStopSet(); - else if (language.equals("dewiki")) - stopwords = GermanAnalyzer.getDefaultStopSet(); - else if (language.equals("elwiki")) - stopwords = GreekAnalyzer.getDefaultStopSet(); - else if (language.equals("fawiki")) - stopwords = PersianAnalyzer.getDefaultStopSet(); - else if (language.equals("frwiki")) - stopwords = FrenchAnalyzer.getDefaultStopSet(); - else if (language.equals("nlwiki")) - stopwords = DutchAnalyzer.getDefaultStopSet(); - else - stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET; - } else { throw new RuntimeException("Unknown ingest language! " + fileName); } @@ -150,6 +126,9 @@ public class WikipediaMapper extends Map String colfPrefix = language + NULL_BYTE; String indexPrefix = "fi" + NULL_BYTE; if (article != null) { + int groupId = WikipediaMapper.getPartitionId(article, numGroups); + if(groupId != myGroup) + return; Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions))); // Create the mutations for the document. @@ -230,9 +209,8 @@ public class WikipediaMapper extends Map Set tokenList = new HashSet(); WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText())); TermAttribute term = tok.addAttribute(TermAttribute.class); - StopFilter filter = new StopFilter(false, tok, stopwords, true); try { - while (filter.incrementToken()) { + while (tok.incrementToken()) { String token = term.term(); if (!StringUtils.isEmpty(token)) tokenList.add(token); Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java Mon Feb 6 22:53:29 2012 @@ -20,6 +20,7 @@ package org.apache.accumulo.examples.wik import java.io.IOException; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.util.TextUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -60,7 +61,7 @@ public class AggregatingRecordReader ext @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { - super.initialize(genericSplit, context); + super.initialize(((WikipediaInputSplit)genericSplit).getFileSplit(), context); this.startToken = WikipediaConfiguration.isNull(context.getConfiguration(), START_TOKEN, String.class); this.endToken = WikipediaConfiguration.isNull(context.getConfiguration(), END_TOKEN, String.class); this.returnPartialMatches = context.getConfiguration().getBoolean(RETURN_PARTIAL_MATCHES, false); Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java Mon Feb 6 22:53:29 2012 @@ -100,6 +100,7 @@ public class WikipediaMapperTest { conf.set(AggregatingRecordReader.END_TOKEN, ""); conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); + conf.set(WikipediaConfiguration.NUM_GROUPS, "1"); MockInstance i = new MockInstance(); c = i.getConnector("root", "pass"); Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java Mon Feb 6 22:53:29 2012 @@ -28,6 +28,7 @@ import javax.xml.xpath.XPath; import javax.xml.xpath.XPathExpression; import javax.xml.xpath.XPathFactory; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -140,7 +141,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); AggregatingRecordReader reader = new AggregatingRecordReader(); try { // Clear the values for BEGIN and STOP TOKEN @@ -162,7 +163,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -183,7 +184,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -201,7 +202,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -219,7 +220,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -244,7 +245,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -263,7 +264,7 @@ public class AggregatingRecordReaderTest File f = createFile(xml5); // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); Modified: incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java Mon Feb 6 22:53:29 2012 @@ -37,6 +37,8 @@ import org.apache.accumulo.core.data.Ran import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; @@ -113,6 +115,7 @@ public class TestQueryLogic { conf.set(AggregatingRecordReader.END_TOKEN, ""); conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); + conf.set(WikipediaConfiguration.NUM_GROUPS, "1"); MockInstance i = new MockInstance(); c = i.getConnector("root", "pass"); @@ -136,7 +139,7 @@ public class TestQueryLogic { Path tmpFile = new Path(data.getAbsolutePath()); // Setup the Mapper - InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null),0); AggregatingRecordReader rr = new AggregatingRecordReader(); Path ocPath = new Path(tmpFile, "oc"); OutputCommitter oc = new FileOutputCommitter(ocPath, context); Propchange: incubator/accumulo/trunk/src/server/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Feb 6 22:53:29 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3.5rc/src/server:1209938 /incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 -/incubator/accumulo/branches/1.4/src/server:1201902-1241123 +/incubator/accumulo/branches/1.4/src/server:1201902-1241241 Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Mon Feb 6 22:53:29 2012 @@ -41,7 +41,6 @@ import org.apache.accumulo.core.client.I import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -370,7 +369,7 @@ public class SimpleGarbageCollector impl /** * This method gets a set of candidates for deletion by scanning the METADATA table deleted flag keyspace */ - private SortedSet getCandidates() { + SortedSet getCandidates() throws Exception { TreeSet candidates = new TreeSet(); if (offline) { @@ -392,8 +391,8 @@ public class SimpleGarbageCollector impl return candidates; } - Scanner scanner = new ScannerImpl(instance, credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS); - + Scanner scanner = instance.getConnector(credentials).createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + if (continueKey != null) { // want to ensure GC makes progress... if the 1st N deletes are stable and we keep processing them, then will never inspect deletes after N scanner.setRange(new Range(continueKey, true, Constants.METADATA_DELETES_KEYSPACE.getEndKey(), Constants.METADATA_DELETES_KEYSPACE.isEndKeyInclusive())); Modified: incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java?rev=1241244&r1=1241243&r2=1241244&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java (original) +++ incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java Mon Feb 6 22:53:29 2012 @@ -17,6 +17,8 @@ package org.apache.accumulo.server.gc; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map.Entry; import java.util.SortedSet; import java.util.TreeSet; @@ -26,9 +28,11 @@ import org.apache.accumulo.core.Constant import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.thrift.AuthInfo; @@ -45,9 +49,8 @@ public class TestConfirmDeletes { AuthInfo auth = new AuthInfo("root", ByteBuffer.wrap("secret".getBytes()), "instance"); - SortedSet singletonSet(String s) { - SortedSet result = new TreeSet(); - result.add(s); + SortedSet newSet(String... s) { + SortedSet result = new TreeSet(Arrays.asList(s)); return result; } @@ -56,47 +59,66 @@ public class TestConfirmDeletes { // have a directory reference String metadata[] = {"1636< last:3353986642a66eb 192.168.117.9:9997", "1636< srv:dir /default_tablet", "1636< srv:flush 2", - "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb", "1636< srv:time M1328505870023", "1636< ~tab:~pr \0",}; - - SortedSet candidates = singletonSet("/1636/default_tablet"); - test1(metadata, candidates); - Assert.assertEquals(0, candidates.size()); + "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb", "1636< srv:time M1328505870023", "1636< ~tab:~pr \0"}; + String deletes[] = {"~del/1636/default_tablet"}; + test1(metadata, deletes, 1, 0); + // have no file reference - candidates = singletonSet("/1636/default_tablet/someFile"); - test1(metadata, candidates); - Assert.assertEquals(1, candidates.size()); - + deletes = new String[] {"~del/1636/default_tablet/someFile"}; + test1(metadata, deletes, 1, 1); + // have a file reference metadata = new String[] {"1636< file:/default_tablet/someFile 10,100", "1636< last:3353986642a66eb 192.168.117.9:9997", "1636< srv:dir /default_tablet", "1636< srv:flush 2", "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb", "1636< srv:time M1328505870023", - "1636< ~tab:~pr \0",}; - test1(metadata, candidates); - Assert.assertEquals(0, candidates.size()); - - // have an indirect file reference - candidates = singletonSet("/1636/default_tablet/someFile"); - metadata = new String[] {"1636< file:../default_tablet/someFile 10,100", "1636< last:3353986642a66eb 192.168.117.9:9997", "1636< srv:dir /default_tablet", - "1636< srv:flush 2", "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb", "1636< srv:time M1328505870023", - "1636< ~tab:~pr \0",}; - test1(metadata, candidates); - Assert.assertEquals(0, candidates.size()); + "1636< ~tab:~pr \0"}; + test1(metadata, deletes, 1, 0); + // have an indirect file reference + deletes = new String[] {"~del/9/default_tablet/someFile"}; + metadata = new String[] {"1636< file:../9/default_tablet/someFile 10,100", "1636< last:3353986642a66eb 192.168.117.9:9997", + "1636< srv:dir /default_tablet", "1636< srv:flush 2", "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb", + "1636< srv:time M1328505870023", "1636< ~tab:~pr \0"}; + + test1(metadata, deletes, 1, 0); + + // have an indirect file reference and a directory candidate + deletes = new String[] {"~del/9/default_tablet"}; + test1(metadata, deletes, 1, 0); + + deletes = new String[] {"~del/9/default_tablet", "~del/9/default_tablet/someFile"}; + test1(metadata, deletes, 2, 0); + + deletes = new String[] {"~blip/1636/b-0001", "~del/1636/b-0001/I0000"}; + test1(metadata, deletes, 1, 0); } - private void test1(String[] metadata, SortedSet candidates) throws Exception { - Instance instance = new MockInstance("mockabyebaby"); + private void test1(String[] metadata, String[] deletes, int expectedInitial, int expected) throws Exception { + Instance instance = new MockInstance(); FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance()); AccumuloConfiguration aconf = DefaultConfiguration.getInstance(); - load(instance, metadata); + load(instance, metadata, deletes); SimpleGarbageCollector gc = new SimpleGarbageCollector(new String[] {}); gc.init(fs, instance, auth, aconf); + SortedSet candidates = gc.getCandidates(); + Assert.assertEquals(expectedInitial, candidates.size()); gc.confirmDeletes(candidates); + Assert.assertEquals(expected, candidates.size()); } - private void load(Instance instance, String[] metadata) throws Exception { + private void load(Instance instance, String[] metadata, String[] deletes) throws Exception { + Scanner scanner = instance.getConnector(auth).createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + int count = 0; + for (@SuppressWarnings("unused") + Entry entry : scanner) { + count++; + } + + // ensure there is no data from previous test + Assert.assertEquals(0, count); + Connector conn = instance.getConnector(auth); BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000, 1000, 1); for (String line : metadata) { @@ -106,6 +128,12 @@ public class TestConfirmDeletes { m.put(new Text(columnParts[0]), new Text(columnParts[1]), new Value(parts[2].getBytes())); bw.addMutation(m); } + + for (String line : deletes) { + Mutation m = new Mutation(line); + m.put("", "", ""); + bw.addMutation(m); + } bw.close(); } }