Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CAEAA9D0C for ; Mon, 6 Feb 2012 22:02:11 +0000 (UTC) Received: (qmail 7296 invoked by uid 500); 6 Feb 2012 22:02:11 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 7263 invoked by uid 500); 6 Feb 2012 22:02:11 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 7254 invoked by uid 99); 6 Feb 2012 22:02:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2012 22:02:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2012 22:02:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A57FF23888CD; Mon, 6 Feb 2012 22:01:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1241206 - in /incubator/accumulo/branches/1.4/src/examples/wikisearch: ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/ ingest/src/test/java/org/apach... Date: Mon, 06 Feb 2012 22:01:49 -0000 To: accumulo-commits@incubator.apache.org From: afuchs@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120206220149.A57FF23888CD@eris.apache.org> Author: afuchs Date: Mon Feb 6 22:01:48 2012 New Revision: 1241206 URL: http://svn.apache.org/viewvc?rev=1241206&view=rev Log: ACCUMULO-375 Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1241206&r1=1241205&r2=1241206&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java Mon Feb 6 22:01:48 2012 @@ -45,6 +45,9 @@ public class WikipediaConfiguration { public final static String ANALYZER = "wikipedia.index.analyzer"; public final static String NUM_PARTITIONS = "wikipedia.ingest.partitions"; + + public final static String NUM_GROUPS = "wikipedia.ingest.groups"; + public static String getUser(Configuration conf) { return conf.get(USER); @@ -110,6 +113,10 @@ public class WikipediaConfiguration { return conf.getInt(NUM_PARTITIONS, 25); } + public static int getNumGroups(Configuration conf) { + return conf.getInt(NUM_GROUPS, 1); + } + /** * Helper method to get properties from Hadoop configuration * Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java?rev=1241206&r1=1241205&r2=1241206&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java Mon Feb 6 22:01:48 2012 @@ -16,20 +16,107 @@ */ package org.apache.accumulo.examples.wikisearch.ingest; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class WikipediaInputFormat extends TextInputFormat { + + public static class WikipediaInputSplit extends InputSplit implements Writable { + + public WikipediaInputSplit(){} + + public WikipediaInputSplit(FileSplit fileSplit, int partition) + { + this.fileSplit = fileSplit; + this.partition = partition; + } + + private FileSplit fileSplit = null; + private int partition = -1; + + public int getPartition() + { + return partition; + } + + public FileSplit getFileSplit() + { + return fileSplit; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return fileSplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return fileSplit.getLocations(); + } + + @Override + public void readFields(DataInput in) throws IOException { + Path file = new Path(in.readUTF()); + long start = in.readLong(); + long length = in.readLong(); + int numHosts = in.readInt(); + String[] hosts = new String[numHosts]; + for(int i = 0; i < numHosts; i++) + hosts[i] = in.readUTF(); + fileSplit = new FileSplit(file, start, length, hosts); + partition = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(fileSplit.getPath().toString()); + out.writeLong(fileSplit.getStart()); + out.writeLong(fileSplit.getLength()); + String [] hosts = fileSplit.getLocations(); + out.writeInt(hosts.length); + for(String host:hosts) + out.writeUTF(host); + fileSplit.write(out); + out.writeInt(partition); + } + + } @Override + public List getSplits(JobContext job) throws IOException { + List superSplits = super.getSplits(job); + List splits = new ArrayList(); + + int numGroups = WikipediaConfiguration.getNumGroups(job.getConfiguration()); + + for(InputSplit split:superSplits) + { + FileSplit fileSplit = (FileSplit)split; + for(int group = 0; group < numGroups; group++) + { + splits.add(new WikipediaInputSplit(fileSplit,group)); + } + } + return super.getSplits(job); + } + + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new AggregatingRecordReader(); } Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java?rev=1241206&r1=1241205&r2=1241206&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java Mon Feb 6 22:01:48 2012 @@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mut import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.protobuf.Uid; import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; @@ -71,6 +72,9 @@ public class WikipediaMapper extends Map private String language; private int numPartitions = 0; private ColumnVisibility cv = null; + + private int myGroup = -1; + private int numGroups = -1; private Text tablename = null; private Text indexTableName = null; @@ -85,7 +89,11 @@ public class WikipediaMapper extends Map reverseIndexTableName = new Text(tablename + "ReverseIndex"); metadataTableName = new Text(tablename + "Metadata"); - FileSplit split = (FileSplit) context.getInputSplit(); + WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit(); + myGroup = wiSplit.getPartition(); + numGroups = WikipediaConfiguration.getNumGroups(conf); + + FileSplit split = wiSplit.getFileSplit(); String fileName = split.getPath().getName(); Matcher matcher = languagePattern.matcher(fileName); if (matcher.matches()) { @@ -118,6 +126,9 @@ public class WikipediaMapper extends Map String colfPrefix = language + NULL_BYTE; String indexPrefix = "fi" + NULL_BYTE; if (article != null) { + int groupId = WikipediaMapper.getPartitionId(article, numGroups); + if(groupId != myGroup) + return; Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions))); // Create the mutations for the document. Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java?rev=1241206&r1=1241205&r2=1241206&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReader.java Mon Feb 6 22:01:48 2012 @@ -20,6 +20,7 @@ package org.apache.accumulo.examples.wik import java.io.IOException; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.util.TextUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -60,7 +61,7 @@ public class AggregatingRecordReader ext @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { - super.initialize(genericSplit, context); + super.initialize(((WikipediaInputSplit)genericSplit).getFileSplit(), context); this.startToken = WikipediaConfiguration.isNull(context.getConfiguration(), START_TOKEN, String.class); this.endToken = WikipediaConfiguration.isNull(context.getConfiguration(), END_TOKEN, String.class); this.returnPartialMatches = context.getConfiguration().getBoolean(RETURN_PARTIAL_MATCHES, false); Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java?rev=1241206&r1=1241205&r2=1241206&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapperTest.java Mon Feb 6 22:01:48 2012 @@ -100,6 +100,7 @@ public class WikipediaMapperTest { conf.set(AggregatingRecordReader.END_TOKEN, ""); conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); + conf.set(WikipediaConfiguration.NUM_GROUPS, "1"); MockInstance i = new MockInstance(); c = i.getConnector("root", "pass"); Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java?rev=1241206&r1=1241205&r2=1241206&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/test/java/org/apache/accumulo/examples/wikisearch/reader/AggregatingRecordReaderTest.java Mon Feb 6 22:01:48 2012 @@ -28,6 +28,7 @@ import javax.xml.xpath.XPath; import javax.xml.xpath.XPathExpression; import javax.xml.xpath.XPathFactory; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -140,7 +141,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); AggregatingRecordReader reader = new AggregatingRecordReader(); try { // Clear the values for BEGIN and STOP TOKEN @@ -162,7 +163,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -183,7 +184,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -201,7 +202,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -219,7 +220,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -244,7 +245,7 @@ public class AggregatingRecordReaderTest // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); @@ -263,7 +264,7 @@ public class AggregatingRecordReaderTest File f = createFile(xml5); // Create FileSplit Path p = new Path(f.toURI().toString()); - FileSplit split = new FileSplit(p, 0, f.length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null),0); // Initialize the RecordReader AggregatingRecordReader reader = new AggregatingRecordReader(); Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java?rev=1241206&r1=1241205&r2=1241206&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java Mon Feb 6 22:01:48 2012 @@ -37,6 +37,8 @@ import org.apache.accumulo.core.data.Ran import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat; +import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; @@ -113,6 +115,7 @@ public class TestQueryLogic { conf.set(AggregatingRecordReader.END_TOKEN, ""); conf.set(WikipediaConfiguration.TABLE_NAME, TABLE_NAME); conf.set(WikipediaConfiguration.NUM_PARTITIONS, "1"); + conf.set(WikipediaConfiguration.NUM_GROUPS, "1"); MockInstance i = new MockInstance(); c = i.getConnector("root", "pass"); @@ -136,7 +139,7 @@ public class TestQueryLogic { Path tmpFile = new Path(data.getAbsolutePath()); // Setup the Mapper - InputSplit split = new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null); + WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(tmpFile, 0, fs.pathToFile(tmpFile).length(), null),0); AggregatingRecordReader rr = new AggregatingRecordReader(); Path ocPath = new Path(tmpFile, "oc"); OutputCommitter oc = new FileOutputCommitter(ocPath, context);