incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1241244 - in /incubator/accumulo/trunk: ./ src/core/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/ ...
Date Mon, 06 Feb 2012 22:53:29 GMT
Author: kturner
Date: Mon Feb  6 22:53:29 2012
New Revision: 1241244

URL: http://svn.apache.org/viewvc?rev=1241244&view=rev
Log:
ACCUMULO-373 ACCUMULO-374 ACCUMULO-375 merged from 1.4

Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/src/core/   (props changed)
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
    incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
    incubator/accumulo/trunk/src/server/   (props changed)
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb  6 22:53:29 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873
 /incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1241123
+/incubator/accumulo/branches/1.4:1201902-1241241

Propchange: incubator/accumulo/trunk/src/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb  6 22:53:29 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3.5rc/src/core:1209938
 /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
-/incubator/accumulo/branches/1.4/src/core:1201902-1241123
+/incubator/accumulo/branches/1.4/src/core:1201902-1241241

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
Mon Feb  6 22:53:29 2012
@@ -45,6 +45,9 @@ public class WikipediaConfiguration {
   public final static String ANALYZER = "wikipedia.index.analyzer";
   
   public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions";
+
+  public final static String NUM_GROUPS = "wikipedia.ingest.groups";
+
   
   public static String getUser(Configuration conf) {
     return conf.get(USER);
@@ -110,6 +113,10 @@ public class WikipediaConfiguration {
     return conf.getInt(NUM_PARTITIONS, 25);
   }
   
+  public static int getNumGroups(Configuration conf) {
+    return conf.getInt(NUM_GROUPS, 1);
+  }
+  
   /**
    * Helper method to get properties from Hadoop configuration
    * 

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
Mon Feb  6 22:53:29 2012
@@ -16,20 +16,107 @@
  */
 package org.apache.accumulo.examples.wikisearch.ingest;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
 
 public class WikipediaInputFormat extends TextInputFormat {
+
+  public static class WikipediaInputSplit extends InputSplit implements Writable {
+
+    public WikipediaInputSplit(){}
+    
+    public WikipediaInputSplit(FileSplit fileSplit, int partition)
+    {
+      this.fileSplit = fileSplit;
+      this.partition = partition;
+    }
+    
+    private FileSplit fileSplit = null;
+    private int partition = -1;
+
+    public int getPartition()
+    {
+      return partition;
+    }
+    
+    public FileSplit getFileSplit()
+    {
+      return fileSplit;
+    }
+    
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return fileSplit.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return fileSplit.getLocations();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      Path file = new Path(in.readUTF());
+      long start = in.readLong();
+      long length = in.readLong();
+      int numHosts = in.readInt();
+      String[] hosts = new String[numHosts];
+      for(int i = 0; i < numHosts; i++)
+        hosts[i] = in.readUTF();
+      fileSplit = new FileSplit(file, start, length, hosts);
+      partition = in.readInt();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(fileSplit.getPath().toString());
+      out.writeLong(fileSplit.getStart());
+      out.writeLong(fileSplit.getLength());
+      String [] hosts = fileSplit.getLocations();
+      out.writeInt(hosts.length);
+      for(String host:hosts)
+        out.writeUTF(host);
+      fileSplit.write(out);
+      out.writeInt(partition);
+    }
+    
+  }
   
   @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    List<InputSplit> superSplits = super.getSplits(job);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    
+    int numGroups = WikipediaConfiguration.getNumGroups(job.getConfiguration());
+
+    for(InputSplit split:superSplits)
+    {
+      FileSplit fileSplit = (FileSplit)split;
+      for(int group = 0; group < numGroups; group++)
+      {
+        splits.add(new WikipediaInputSplit(fileSplit,group));
+      }
+    }
+    return splits;
+  }
+
+  @Override
   public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext
context) {
     return new AggregatingRecordReader();
   }

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
Mon Feb  6 22:53:29 2012
@@ -32,11 +32,11 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
 import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
@@ -48,20 +48,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.log4j.Logger;
-import org.apache.lucene.analysis.StopAnalyzer;
-import org.apache.lucene.analysis.StopFilter;
-import org.apache.lucene.analysis.ar.ArabicAnalyzer;
-import org.apache.lucene.analysis.br.BrazilianAnalyzer;
-import org.apache.lucene.analysis.cjk.CJKAnalyzer;
-import org.apache.lucene.analysis.de.GermanAnalyzer;
-import org.apache.lucene.analysis.el.GreekAnalyzer;
-import org.apache.lucene.analysis.fa.PersianAnalyzer;
-import org.apache.lucene.analysis.fr.FrenchAnalyzer;
-import org.apache.lucene.analysis.nl.DutchAnalyzer;
 import org.apache.lucene.analysis.tokenattributes.TermAttribute;
 import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer;
 
-
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
@@ -82,8 +71,10 @@ public class WikipediaMapper extends Map
   private ArticleExtractor extractor;
   private String language;
   private int numPartitions = 0;
-  private Set<?> stopwords = null;
   private ColumnVisibility cv = null;
+
+  private int myGroup = -1;
+  private int numGroups = -1;
   
   private Text tablename = null;
   private Text indexTableName = null;
@@ -98,30 +89,15 @@ public class WikipediaMapper extends Map
     reverseIndexTableName = new Text(tablename + "ReverseIndex");
     metadataTableName = new Text(tablename + "Metadata");
     
-    FileSplit split = (FileSplit) context.getInputSplit();
+    WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit();
+    myGroup = wiSplit.getPartition();
+    numGroups = WikipediaConfiguration.getNumGroups(conf);
+    
+    FileSplit split = wiSplit.getFileSplit();
     String fileName = split.getPath().getName();
     Matcher matcher = languagePattern.matcher(fileName);
     if (matcher.matches()) {
       language = matcher.group(1).replace('_', '-').toLowerCase();
-      if (language.equals("arwiki"))
-        stopwords = ArabicAnalyzer.getDefaultStopSet();
-      else if (language.equals("brwiki"))
-        stopwords = BrazilianAnalyzer.getDefaultStopSet();
-      else if (language.startsWith("zh"))
-        stopwords = CJKAnalyzer.getDefaultStopSet();
-      else if (language.equals("dewiki"))
-        stopwords = GermanAnalyzer.getDefaultStopSet();
-      else if (language.equals("elwiki"))
-        stopwords = GreekAnalyzer.getDefaultStopSet();
-      else if (language.equals("fawiki"))
-        stopwords = PersianAnalyzer.getDefaultStopSet();
-      else if (language.equals("frwiki"))
-        stopwords = FrenchAnalyzer.getDefaultStopSet();
-      else if (language.equals("nlwiki"))
-        stopwords = DutchAnalyzer.getDefaultStopSet();
-      else
-        stopwords = StopAnalyzer.ENGLISH_STOP_WORDS_SET;
-      
     } else {
       throw new RuntimeException("Unknown ingest language! " + fileName);
     }
@@ -150,6 +126,9 @@ public class WikipediaMapper extends Map
     String colfPrefix = language + NULL_BYTE;
     String indexPrefix = "fi" + NULL_BYTE;
     if (article != null) {
+      int groupId = WikipediaMapper.getPartitionId(article, numGroups);
+      if(groupId != myGroup)
+        return;
       Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article,
numPartitions)));
       
       // Create the mutations for the document.
@@ -230,9 +209,8 @@ public class WikipediaMapper extends Map
     Set<String> tokenList = new HashSet<String>();
     WikipediaTokenizer tok = new WikipediaTokenizer(new StringReader(article.getText()));
     TermAttribute term = tok.addAttribute(TermAttribute.class);
-    StopFilter filter = new StopFilter(false, tok, stopwords, true);
     try {
-      while (filter.incrementToken()) {
+      while (tok.incrementToken()) {
         String token = term.term();
         if (!StringUtils.isEmpty(token))
           tokenList.add(token);

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java
Mon Feb  6 22:53:29 2012
@@ -20,6 +20,7 @@ package org.apache.accumulo.examples.wik
 import java.io.IOException;
 
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.util.TextUtil;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -60,7 +61,7 @@ public class AggregatingRecordReader ext
   
   @Override
   public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException
{
-    super.initialize(genericSplit, context);
+    super.initialize(((WikipediaInputSplit)genericSplit).getFileSplit(), context);
     this.startToken = WikipediaConfiguration.isNull(context.getConfiguration(), START_TOKEN,
String.class);
     this.endToken = WikipediaConfiguration.isNull(context.getConfiguration(), END_TOKEN,
String.class);
     this.returnPartialMatches = context.getConfiguration().getBoolean(RETURN_PARTIAL_MATCHES,
false);

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java
Mon Feb  6 22:53:29 2012
@@ -100,6 +100,7 @@ public class WikipediaMapperTest {
     conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
     conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
     conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
     
     MockInstance i = new MockInstance();
     c = i.getConnector("root", "pass");

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java
Mon Feb  6 22:53:29 2012
@@ -28,6 +28,7 @@ import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathExpression;
 import javax.xml.xpath.XPathFactory;
 
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -140,7 +141,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     AggregatingRecordReader reader = new AggregatingRecordReader();
     try {
       // Clear the values for BEGIN and STOP TOKEN
@@ -162,7 +163,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -183,7 +184,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -201,7 +202,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -219,7 +220,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -244,7 +245,7 @@ public class AggregatingRecordReaderTest
     
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();
@@ -263,7 +264,7 @@ public class AggregatingRecordReaderTest
     File f = createFile(xml5);
     // Create FileSplit
     Path p = new Path(f.toURI().toString());
-    FileSplit split = new FileSplit(p, 0, f.length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0);
     
     // Initialize the RecordReader
     AggregatingRecordReader reader = new AggregatingRecordReader();

Modified: incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java
Mon Feb  6 22:53:29 2012
@@ -37,6 +37,8 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
 import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
@@ -113,6 +115,7 @@ public class TestQueryLogic {
     conf.set(AggregatingRecordReader.END_TOKEN, "</page>");
     conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME);
     conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1");
+    conf.set(WikipediaConfiguration.NUM_GROUPS, "1");
     
     MockInstance i = new MockInstance();
     c = i.getConnector("root", "pass");
@@ -136,7 +139,7 @@ public class TestQueryLogic {
     Path tmpFile = new Path(data.getAbsolutePath());
     
     // Setup the Mapper
-    InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null);
+    WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(),
null),0);
     AggregatingRecordReader rr = new AggregatingRecordReader();
     Path ocPath = new Path(tmpFile, "oc");
     OutputCommitter oc = new FileOutputCommitter(ocPath, context);

Propchange: incubator/accumulo/trunk/src/server/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb  6 22:53:29 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3.5rc/src/server:1209938
 /incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611
-/incubator/accumulo/branches/1.4/src/server:1201902-1241123
+/incubator/accumulo/branches/1.4/src/server:1201902-1241241

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
Mon Feb  6 22:53:29 2012
@@ -41,7 +41,6 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -370,7 +369,7 @@ public class SimpleGarbageCollector impl
   /**
    * This method gets a set of candidates for deletion by scanning the METADATA table deleted
flag keyspace
    */
-  private SortedSet<String> getCandidates() {
+  SortedSet<String> getCandidates() throws Exception {
     TreeSet<String> candidates = new TreeSet<String>();
     
     if (offline) {
@@ -392,8 +391,8 @@ public class SimpleGarbageCollector impl
       return candidates;
     }
     
-    Scanner scanner = new ScannerImpl(instance, credentials, Constants.METADATA_TABLE_ID,
Constants.NO_AUTHS);
-    
+    Scanner scanner = instance.getConnector(credentials).createScanner(Constants.METADATA_TABLE_NAME,
Constants.NO_AUTHS);
+
     if (continueKey != null) {
       // want to ensure GC makes progress... if the 1st N deletes are stable and we keep
processing them, then will never inspect deletes after N
       scanner.setRange(new Range(continueKey, true, Constants.METADATA_DELETES_KEYSPACE.getEndKey(),
Constants.METADATA_DELETES_KEYSPACE.isEndKeyInclusive()));

Modified: incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java?rev=1241244&r1=1241243&r2=1241244&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
(original)
+++ incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
Mon Feb  6 22:53:29 2012
@@ -17,6 +17,8 @@
 package org.apache.accumulo.server.gc;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map.Entry;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
@@ -26,9 +28,11 @@ import org.apache.accumulo.core.Constant
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
@@ -45,9 +49,8 @@ public class TestConfirmDeletes {
   
   AuthInfo auth = new AuthInfo("root", ByteBuffer.wrap("secret".getBytes()), "instance");
 
-  SortedSet<String> singletonSet(String s) {
-    SortedSet<String> result = new TreeSet<String>();
-    result.add(s);
+  SortedSet<String> newSet(String... s) {
+    SortedSet<String> result = new TreeSet<String>(Arrays.asList(s));
     return result;
   }
 
@@ -56,47 +59,66 @@ public class TestConfirmDeletes {
     
     // have a directory reference
     String metadata[] = {"1636< last:3353986642a66eb 192.168.117.9:9997", "1636< srv:dir
/default_tablet", "1636< srv:flush 2",
-        "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb",
"1636< srv:time M1328505870023", "1636< ~tab:~pr \0",};
-    
-    SortedSet<String> candidates = singletonSet("/1636/default_tablet");
-    test1(metadata, candidates);
-    Assert.assertEquals(0, candidates.size());
+        "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb",
"1636< srv:time M1328505870023", "1636< ~tab:~pr \0"};
+    String deletes[] = {"~del/1636/default_tablet"};
     
+    test1(metadata, deletes, 1, 0);
+      
     // have no file reference
-    candidates = singletonSet("/1636/default_tablet/someFile");
-    test1(metadata, candidates);
-    Assert.assertEquals(1, candidates.size());
-    
+    deletes = new String[] {"~del/1636/default_tablet/someFile"};
+    test1(metadata, deletes, 1, 1);
+
     // have a file reference
     metadata = new String[] {"1636< file:/default_tablet/someFile 10,100", "1636< last:3353986642a66eb
192.168.117.9:9997", "1636< srv:dir /default_tablet",
         "1636< srv:flush 2", "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb",
"1636< srv:time M1328505870023",
-        "1636< ~tab:~pr \0",};
-    test1(metadata, candidates);
-    Assert.assertEquals(0, candidates.size());
-    
-    // have an indirect file reference
-    candidates = singletonSet("/1636/default_tablet/someFile");
-    metadata = new String[] {"1636< file:../default_tablet/someFile 10,100", "1636<
last:3353986642a66eb 192.168.117.9:9997", "1636< srv:dir /default_tablet",
-        "1636< srv:flush 2", "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb",
"1636< srv:time M1328505870023",
-        "1636< ~tab:~pr \0",};
-    test1(metadata, candidates);
-    Assert.assertEquals(0, candidates.size());
+        "1636< ~tab:~pr \0"};
+    test1(metadata, deletes, 1, 0);
 
+    // have an indirect file reference
+    deletes = new String[] {"~del/9/default_tablet/someFile"};
+    metadata = new String[] {"1636< file:../9/default_tablet/someFile 10,100", "1636<
last:3353986642a66eb 192.168.117.9:9997",
+        "1636< srv:dir /default_tablet", "1636< srv:flush 2", "1636< srv:lock tservers/192.168.117.9:9997/zlock-0000000000$3353986642a66eb",
+        "1636< srv:time M1328505870023", "1636< ~tab:~pr \0"};
+    
+    test1(metadata, deletes, 1, 0);
+    
+    // have an indirect file reference and a directory candidate
+    deletes = new String[] {"~del/9/default_tablet"};
+    test1(metadata, deletes, 1, 0);
+     
+    deletes = new String[] {"~del/9/default_tablet", "~del/9/default_tablet/someFile"};
+    test1(metadata, deletes, 2, 0);
+    
+    deletes = new String[] {"~blip/1636/b-0001", "~del/1636/b-0001/I0000"};
+    test1(metadata, deletes, 1, 0);
   }
   
-  private void test1(String[] metadata, SortedSet<String> candidates) throws Exception
{
-    Instance instance = new MockInstance("mockabyebaby");
+  private void test1(String[] metadata, String[] deletes, int expectedInitial, int expected)
throws Exception {
+    Instance instance = new MockInstance();
     FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
     AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
     
-    load(instance, metadata);
+    load(instance, metadata, deletes);
 
     SimpleGarbageCollector gc = new SimpleGarbageCollector(new String[] {});
     gc.init(fs, instance, auth, aconf);
+    SortedSet<String> candidates = gc.getCandidates();
+    Assert.assertEquals(expectedInitial, candidates.size());
     gc.confirmDeletes(candidates);
+    Assert.assertEquals(expected, candidates.size());
   }
   
-  private void load(Instance instance, String[] metadata) throws Exception {
+  private void load(Instance instance, String[] metadata, String[] deletes) throws Exception
{
+    Scanner scanner = instance.getConnector(auth).createScanner(Constants.METADATA_TABLE_NAME,
Constants.NO_AUTHS);
+    int count = 0;
+    for (@SuppressWarnings("unused")
+    Entry<Key,Value> entry : scanner) {
+      count++;
+    }
+    
+    // ensure there is no data from previous test
+    Assert.assertEquals(0, count);
+
     Connector conn = instance.getConnector(auth);
     BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000, 1000, 1);
     for (String line : metadata) {
@@ -106,6 +128,12 @@ public class TestConfirmDeletes {
       m.put(new Text(columnParts[0]), new Text(columnParts[1]), new Value(parts[2].getBytes()));
       bw.addMutation(m);
     }
+    
+    for (String line : deletes) {
+      Mutation m = new Mutation(line);
+      m.put("", "", "");
+      bw.addMutation(m);
+    }
     bw.close();
   }
 }



Mime
View raw message