Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DF96710881 for ; Tue, 26 Nov 2013 15:19:14 +0000 (UTC) Received: (qmail 63615 invoked by uid 500); 26 Nov 2013 15:18:36 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 63109 invoked by uid 500); 26 Nov 2013 15:18:19 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 62327 invoked by uid 99); 26 Nov 2013 15:18:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Nov 2013 15:18:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 94E9891850F; Tue, 26 Nov 2013 15:17:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Tue, 26 Nov 2013 15:18:11 -0000 Message-Id: In-Reply-To: <35172c83b6424670a9d2ec8231f9f5b0@git.apache.org> References: <35172c83b6424670a9d2ec8231f9f5b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/37] ACCUMULO-600 removed wikisearch from trunk http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java deleted file mode 100644 index 90b8308..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.ingest; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.IteratorSetting.Column; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.user.SummingCombiner; -import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args; -import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; -import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; -import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner; -import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat; -import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; - -public class WikipediaPartitionedIngester extends Configured implements Tool { - - private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class); - - public final static String INGEST_LANGUAGE = "wikipedia.ingest_language"; - public final static String SPLIT_FILE = "wikipedia.split_file"; - public final static String TABLE_NAME = "wikipedia.table"; - - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), new WikipediaPartitionedIngester(), args); - System.exit(res); - } - - private void createTables(TableOperations tops, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - TableExistsException { - // Create the shard table - String indexTableName = tableName + "Index"; - String reverseIndexTableName = tableName + "ReverseIndex"; - String metadataTableName = tableName + "Metadata"; - - // create the shard table - if (!tops.exists(tableName)) { - // Set a text index combiner on the given field names. No combiner is set if the option is not supplied - String textIndexFamilies = WikipediaMapper.TOKENS_FIELD_NAME; - - tops.create(tableName); - if (textIndexFamilies.length() > 0) { - System.out.println("Adding content combiner on the fields: " + textIndexFamilies); - - IteratorSetting setting = new IteratorSetting(10, TextIndexCombiner.class); - List columns = new ArrayList(); - for (String family : StringUtils.split(textIndexFamilies, ',')) { - columns.add(new Column("fi\0" + family)); - } - TextIndexCombiner.setColumns(setting, columns); - TextIndexCombiner.setLossyness(setting, true); - - tops.attachIterator(tableName, setting, EnumSet.allOf(IteratorScope.class)); - } - - // Set the locality group for the full content column family - tops.setLocalityGroups(tableName, Collections.singletonMap("WikipediaDocuments", Collections.singleton(new Text(WikipediaMapper.DOCUMENT_COLUMN_FAMILY)))); - - } - - if (!tops.exists(indexTableName)) { - tops.create(indexTableName); - // Add the UID combiner - IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", GlobalIndexUidCombiner.class); - GlobalIndexUidCombiner.setCombineAllColumns(setting, true); - GlobalIndexUidCombiner.setLossyness(setting, true); - tops.attachIterator(indexTableName, setting, EnumSet.allOf(IteratorScope.class)); - } - - if (!tops.exists(reverseIndexTableName)) { - tops.create(reverseIndexTableName); - // Add the UID combiner - IteratorSetting setting = new IteratorSetting(19, "UIDAggregator", GlobalIndexUidCombiner.class); - GlobalIndexUidCombiner.setCombineAllColumns(setting, true); - GlobalIndexUidCombiner.setLossyness(setting, true); - tops.attachIterator(reverseIndexTableName, setting, EnumSet.allOf(IteratorScope.class)); - } - - if (!tops.exists(metadataTableName)) { - // Add the SummingCombiner with VARLEN encoding for the frequency column - tops.create(metadataTableName); - IteratorSetting setting = new IteratorSetting(10, SummingCombiner.class); - SummingCombiner.setColumns(setting, Collections.singletonList(new Column("f"))); - SummingCombiner.setEncodingType(setting, SummingCombiner.Type.VARLEN); - tops.attachIterator(metadataTableName, setting, EnumSet.allOf(IteratorScope.class)); - } - } - - @Override - public int run(String[] args) throws Exception { - Configuration conf = getConf(); - if(WikipediaConfiguration.runPartitioner(conf)) - { - int result = runPartitionerJob(); - if(result != 0) - return result; - } - if(WikipediaConfiguration.runIngest(conf)) - { - int result = runIngestJob(); - if(result != 0) - return result; - if(WikipediaConfiguration.bulkIngest(conf)) - return loadBulkFiles(); - } - return 0; - } - - private int runPartitionerJob() throws Exception - { - Job partitionerJob = new Job(getConf(), "Partition Wikipedia"); - Configuration partitionerConf = partitionerJob.getConfiguration(); - partitionerConf.set("mapred.map.tasks.speculative.execution", "false"); - - configurePartitionerJob(partitionerJob); - - List inputPaths = new ArrayList(); - SortedSet languages = new TreeSet(); - FileSystem fs = FileSystem.get(partitionerConf); - Path parent = new Path(partitionerConf.get("wikipedia.input")); - listFiles(parent, fs, inputPaths, languages); - - System.out.println("Input files in " + parent + ":" + inputPaths.size()); - Path[] inputPathsArray = new Path[inputPaths.size()]; - inputPaths.toArray(inputPathsArray); - - System.out.println("Languages:" + languages.size()); - - // setup input format - - WikipediaInputFormat.setInputPaths(partitionerJob, inputPathsArray); - - partitionerJob.setMapperClass(WikipediaPartitioner.class); - partitionerJob.setNumReduceTasks(0); - - // setup output format - partitionerJob.setMapOutputKeyClass(Text.class); - partitionerJob.setMapOutputValueClass(Article.class); - partitionerJob.setOutputKeyClass(Text.class); - partitionerJob.setOutputValueClass(Article.class); - partitionerJob.setOutputFormatClass(SequenceFileOutputFormat.class); - Path outputDir = WikipediaConfiguration.getPartitionedArticlesPath(partitionerConf); - SequenceFileOutputFormat.setOutputPath(partitionerJob, outputDir); - SequenceFileOutputFormat.setCompressOutput(partitionerJob, true); - SequenceFileOutputFormat.setOutputCompressionType(partitionerJob, CompressionType.RECORD); - - return partitionerJob.waitForCompletion(true) ? 0 : 1; - } - - private int runIngestJob() throws Exception - { - Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia"); - Configuration ingestConf = ingestJob.getConfiguration(); - ingestConf.set("mapred.map.tasks.speculative.execution", "false"); - - configureIngestJob(ingestJob); - - String tablename = WikipediaConfiguration.getTableName(ingestConf); - - Connector connector = WikipediaConfiguration.getConnector(ingestConf); - - TableOperations tops = connector.tableOperations(); - - createTables(tops, tablename); - - ingestJob.setMapperClass(WikipediaPartitionedMapper.class); - ingestJob.setNumReduceTasks(0); - - // setup input format - ingestJob.setInputFormatClass(SequenceFileInputFormat.class); - SequenceFileInputFormat.setInputPaths(ingestJob, WikipediaConfiguration.getPartitionedArticlesPath(ingestConf)); - // TODO make split size configurable - SequenceFileInputFormat.setMinInputSplitSize(ingestJob, WikipediaConfiguration.getMinInputSplitSize(ingestConf)); - - // setup output format - ingestJob.setMapOutputKeyClass(Text.class); - ingestJob.setMapOutputValueClass(Mutation.class); - - if(WikipediaConfiguration.bulkIngest(ingestConf)) - { - ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class); - SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf)); - String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf); - if(bulkIngestDir == null) - { - log.error("Bulk ingest dir not set"); - return 1; - } - SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf)); - } else { - ingestJob.setOutputFormatClass(AccumuloOutputFormat.class); - String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); - String instanceName = WikipediaConfiguration.getInstanceName(ingestConf); - String user = WikipediaConfiguration.getUser(ingestConf); - byte[] password = WikipediaConfiguration.getPassword(ingestConf); - AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename); - AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers); - } - - return ingestJob.waitForCompletion(true) ? 0 : 1; - } - - private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException - { - Configuration conf = getConf(); - - Connector connector = WikipediaConfiguration.getConnector(conf); - - FileSystem fs = FileSystem.get(conf); - String directory = WikipediaConfiguration.bulkIngestDir(conf); - - String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf); - - for(FileStatus status: fs.listStatus(new Path(directory))) - { - if(status.isDir() == false) - continue; - Path dir = status.getPath(); - Path failPath = new Path(failureDirectory+"/"+dir.getName()); - fs.mkdirs(failPath); - connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(), true); - } - - return 0; - } - - public final static PathFilter partFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().startsWith("part"); - }; - }; - - protected void configurePartitionerJob(Job job) { - Configuration conf = job.getConfiguration(); - job.setJarByClass(WikipediaPartitionedIngester.class); - job.setInputFormatClass(WikipediaInputFormat.class); - conf.set(AggregatingRecordReader.START_TOKEN, ""); - conf.set(AggregatingRecordReader.END_TOKEN, ""); - } - - protected void configureIngestJob(Job job) { - job.setJarByClass(WikipediaPartitionedIngester.class); - } - - protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); - - protected void listFiles(Path path, FileSystem fs, List files, Set languages) throws IOException { - for (FileStatus status : fs.listStatus(path)) { - if (status.isDir()) { - listFiles(status.getPath(), fs, files, languages); - } else { - Path p = status.getPath(); - Matcher matcher = filePattern.matcher(p.getName()); - if (matcher.matches()) { - languages.add(matcher.group(1)); - files.add(p); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java deleted file mode 100644 index bb4ae64..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package org.apache.accumulo.examples.wikisearch.ingest; - - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; -import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; -import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; - -public class WikipediaPartitionedMapper extends Mapper { - - // private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class); - - public final static Charset UTF8 = Charset.forName("UTF-8"); - public static final String DOCUMENT_COLUMN_FAMILY = "d"; - public static final String METADATA_EVENT_COLUMN_FAMILY = "e"; - public static final String METADATA_INDEX_COLUMN_FAMILY = "i"; - public static final String TOKENS_FIELD_NAME = "TEXT"; - - private static final Value NULL_VALUE = new Value(new byte[0]); - private static final String cvPrefix = "all|"; - - private int numPartitions = 0; - - private Text tablename = null; - private Text indexTableName = null; - private Text reverseIndexTableName = null; - private Text metadataTableName = null; - - private static class MutationInfo { - final String row; - final String colfam; - final String colqual; - final ColumnVisibility cv; - final long timestamp; - - public MutationInfo(String row, String colfam, String colqual, ColumnVisibility cv, long timestamp) { - super(); - this.row = row; - this.colfam = colfam; - this.colqual = colqual; - this.cv = cv; - this.timestamp = timestamp; - } - - @Override - public boolean equals(Object obj) { - MutationInfo other = (MutationInfo)obj; - return (row == other.row || row.equals(other.row)) && - (colfam == other.colfam || colfam.equals(other.colfam)) && - colqual.equals(other.colqual) && - (cv == other.cv || cv.equals(other.cv)) && - timestamp == other.timestamp; - } - - @Override - public int hashCode() { - return row.hashCode() ^ colfam.hashCode() ^ colqual.hashCode() ^ cv.hashCode() ^ (int)timestamp; - } - } - - private LRUOutputCombiner wikiIndexOutput; - private LRUOutputCombiner wikiReverseIndexOutput; - private LRUOutputCombiner wikiMetadataOutput; - - private static class CountAndSet - { - public int count; - public HashSet set; - - public CountAndSet(String entry) - { - set = new HashSet(); - set.add(entry); - count = 1; - } - } - - MultiTableBatchWriter mtbw; - - @Override - public void setup(final Context context) { - Configuration conf = context.getConfiguration(); - tablename = new Text(WikipediaConfiguration.getTableName(conf)); - indexTableName = new Text(tablename + "Index"); - reverseIndexTableName = new Text(tablename + "ReverseIndex"); - metadataTableName = new Text(tablename + "Metadata"); - - try { - mtbw = WikipediaConfiguration.getConnector(conf).createMultiTableBatchWriter(10000000, 1000, 10); - } catch (AccumuloException e) { - throw new RuntimeException(e); - } catch (AccumuloSecurityException e) { - throw new RuntimeException(e); - } - - final Text metadataTableNameFinal = metadataTableName; - final Text indexTableNameFinal = indexTableName; - final Text reverseIndexTableNameFinal = reverseIndexTableName; - - numPartitions = WikipediaConfiguration.getNumPartitions(conf); - - LRUOutputCombiner.Fold indexFold = - new LRUOutputCombiner.Fold() { - @Override - public CountAndSet fold(CountAndSet oldValue, CountAndSet newValue) { - oldValue.count += newValue.count; - if(oldValue.set == null || newValue.set == null) - { - oldValue.set = null; - return oldValue; - } - oldValue.set.addAll(newValue.set); - if(oldValue.set.size() > GlobalIndexUidCombiner.MAX) - oldValue.set = null; - return oldValue; - } - }; - LRUOutputCombiner.Output indexOutput = - new LRUOutputCombiner.Output() { - - @Override - public void output(MutationInfo key, CountAndSet value) - { - Uid.List.Builder builder = Uid.List.newBuilder(); - builder.setCOUNT(value.count); - if (value.set == null) { - builder.setIGNORE(true); - builder.clearUID(); - } else { - builder.setIGNORE(false); - builder.addAllUID(value.set); - } - Uid.List list = builder.build(); - Value val = new Value(list.toByteArray()); - Mutation m = new Mutation(key.row); - m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); - try { - mtbw.getBatchWriter(indexTableNameFinal.toString()).addMutation(m); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - LRUOutputCombiner.Output reverseIndexOutput = - new LRUOutputCombiner.Output() { - - @Override - public void output(MutationInfo key, CountAndSet value) - { - Uid.List.Builder builder = Uid.List.newBuilder(); - builder.setCOUNT(value.count); - if (value.set == null) { - builder.setIGNORE(true); - builder.clearUID(); - } else { - builder.setIGNORE(false); - builder.addAllUID(value.set); - } - Uid.List list = builder.build(); - Value val = new Value(list.toByteArray()); - Mutation m = new Mutation(key.row); - m.put(key.colfam, key.colqual, key.cv, key.timestamp, val); - try { - mtbw.getBatchWriter(reverseIndexTableNameFinal.toString()).addMutation(m); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - wikiIndexOutput = new LRUOutputCombiner(10000,indexFold,indexOutput); - wikiReverseIndexOutput = new LRUOutputCombiner(10000, indexFold,reverseIndexOutput); - wikiMetadataOutput = new LRUOutputCombiner(10000, - new LRUOutputCombiner.Fold() { - @Override - public Value fold(Value oldValue, Value newValue) { - return oldValue; - }}, - new LRUOutputCombiner.Output() { - @Override - public void output(MutationInfo key, Value value) { - Mutation m = new Mutation(key.row); - m.put(key.colfam, key.colqual, key.cv, key.timestamp, value); - try { - mtbw.getBatchWriter(metadataTableNameFinal.toString()).addMutation(m); - } catch (Exception e) { - throw new RuntimeException(e); - } - }}); - } - - - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - wikiIndexOutput.flush(); - wikiMetadataOutput.flush(); - wikiReverseIndexOutput.flush(); - try { - mtbw.close(); - } catch (MutationsRejectedException e) { - throw new RuntimeException(e); - } - } - - - - @Override - protected void map(Text language, Article article, Context context) throws IOException, InterruptedException { - String NULL_BYTE = "\u0000"; - String colfPrefix = language.toString() + NULL_BYTE; - String indexPrefix = "fi" + NULL_BYTE; - ColumnVisibility cv = new ColumnVisibility(cvPrefix + language); - - if (article != null) { - Text partitionId = new Text(Integer.toString(WikipediaMapper.getPartitionId(article, numPartitions))); - - // Create the mutations for the document. - // Row is partition id, colf is language0articleid, colq is fieldName\0fieldValue - Mutation m = new Mutation(partitionId); - for (Entry entry : article.getFieldValues().entrySet()) { - m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE); - // Create mutations for the metadata table. - MutationInfo mm = new MutationInfo(entry.getKey(), METADATA_EVENT_COLUMN_FAMILY, language.toString(), cv, article.getTimestamp()); - wikiMetadataOutput.put(mm, NULL_VALUE); - } - - // Tokenize the content - Set tokens = WikipediaMapper.getTokens(article); - - // We are going to put the fields to be indexed into a multimap. This allows us to iterate - // over the entire set once. - Multimap indexFields = HashMultimap.create(); - // Add the normalized field values - LcNoDiacriticsNormalizer normalizer = new LcNoDiacriticsNormalizer(); - for (Entry index : article.getNormalizedFieldValues().entrySet()) - indexFields.put(index.getKey(), index.getValue()); - // Add the tokens - for (String token : tokens) - indexFields.put(TOKENS_FIELD_NAME, normalizer.normalizeFieldValue("", token)); - - for (Entry index : indexFields.entries()) { - // Create mutations for the in partition index - // Row is partition id, colf is 'fi'\0fieldName, colq is fieldValue\0language\0article id - m.put(indexPrefix + index.getKey(), index.getValue() + NULL_BYTE + colfPrefix + article.getId(), cv, article.getTimestamp(), NULL_VALUE); - - // Create mutations for the global index - // Row is field value, colf is field name, colq is partitionid\0language, value is Uid.List object - MutationInfo gm = new MutationInfo(index.getValue(),index.getKey(),partitionId + NULL_BYTE + language, cv, article.getTimestamp()); - wikiIndexOutput.put(gm, new CountAndSet(Integer.toString(article.getId()))); - - // Create mutations for the global reverse index - MutationInfo grm = new MutationInfo(StringUtils.reverse(index.getValue()),index.getKey(),partitionId + NULL_BYTE + language, cv, article.getTimestamp()); - wikiReverseIndexOutput.put(grm, new CountAndSet(Integer.toString(article.getId()))); - - // Create mutations for the metadata table. - MutationInfo mm = new MutationInfo(index.getKey(),METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp()); - wikiMetadataOutput.put(mm, NULL_VALUE); - } - // Add the entire text to the document section of the table. - // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document - m.put(DOCUMENT_COLUMN_FAMILY, colfPrefix + article.getId(), cv, article.getTimestamp(), new Value(Base64.encodeBase64(article.getText().getBytes()))); - context.write(tablename, m); - - } else { - context.getCounter("wikipedia", "invalid articles").increment(1); - } - context.progress(); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java deleted file mode 100644 index 3507108..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package org.apache.accumulo.examples.wikisearch.ingest; - - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; -import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -public class WikipediaPartitioner extends Mapper { - - // private static final Logger log = Logger.getLogger(WikipediaPartitioner.class); - - public final static Charset UTF8 = Charset.forName("UTF-8"); - public static final String DOCUMENT_COLUMN_FAMILY = "d"; - public static final String METADATA_EVENT_COLUMN_FAMILY = "e"; - public static final String METADATA_INDEX_COLUMN_FAMILY = "i"; - public static final String TOKENS_FIELD_NAME = "TEXT"; - - private final static Pattern languagePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); - - private ArticleExtractor extractor; - private String language; - - private int myGroup = -1; - private int numGroups = -1; - - @Override - public void setup(Context context) { - Configuration conf = context.getConfiguration(); - - WikipediaInputSplit wiSplit = (WikipediaInputSplit)context.getInputSplit(); - myGroup = wiSplit.getPartition(); - numGroups = WikipediaConfiguration.getNumGroups(conf); - - FileSplit split = wiSplit.getFileSplit(); - String fileName = split.getPath().getName(); - Matcher matcher = languagePattern.matcher(fileName); - if (matcher.matches()) { - language = matcher.group(1).replace('_', '-').toLowerCase(); - } else { - throw new RuntimeException("Unknown ingest language! " + fileName); - } - extractor = new ArticleExtractor(); - } - - @Override - protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8)); - if (article != null) { - int groupId = WikipediaMapper.getPartitionId(article, numGroups); - if(groupId != myGroup) - return; - context.write(new Text(language), article); - } else { - context.getCounter("wikipedia", "invalid articles").increment(1); - context.progress(); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/GlobalIndexUidCombiner.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/GlobalIndexUidCombiner.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/GlobalIndexUidCombiner.java deleted file mode 100644 index 4702521..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/GlobalIndexUidCombiner.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.iterator; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.TypedValueCombiner; -import org.apache.accumulo.core.iterators.ValueFormatException; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid; - -import com.google.protobuf.InvalidProtocolBufferException; - -/** - * - */ -public class GlobalIndexUidCombiner extends TypedValueCombiner { - public static final Encoder UID_LIST_ENCODER = new UidListEncoder(); - public static final int MAX = 20; - - @Override - public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - setEncoder(UID_LIST_ENCODER); - } - - @Override - public Uid.List typedReduce(Key key, Iterator iter) { - Uid.List.Builder builder = Uid.List.newBuilder(); - HashSet uids = new HashSet(); - boolean seenIgnore = false; - long count = 0; - while (iter.hasNext()) { - Uid.List v = iter.next(); - if (null == v) - continue; - count = count + v.getCOUNT(); - if (v.getIGNORE()) { - seenIgnore = true; - } - uids.addAll(v.getUIDList()); - } - // Special case logic - // If we have aggregated more than MAX UIDs, then null out the UID list and set IGNORE to true - // However, always maintain the count - builder.setCOUNT(count); - if (uids.size() > MAX || seenIgnore) { - builder.setIGNORE(true); - builder.clearUID(); - } else { - builder.setIGNORE(false); - builder.addAllUID(uids); - } - return builder.build(); - } - - public static class UidListEncoder implements Encoder { - @Override - public byte[] encode(Uid.List v) { - return v.toByteArray(); - } - - @Override - public Uid.List decode(byte[] b) { - if (b.length == 0) - return null; - try { - return Uid.List.parseFrom(b); - } catch (InvalidProtocolBufferException e) { - throw new ValueFormatException("Value passed to aggregator was not of type Uid.List"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/TextIndexCombiner.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/TextIndexCombiner.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/TextIndexCombiner.java deleted file mode 100644 index 85f3e1e..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/TextIndexCombiner.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.iterator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.TypedValueCombiner; -import org.apache.accumulo.core.iterators.ValueFormatException; -import org.apache.accumulo.examples.wikisearch.protobuf.TermWeight; - -import com.google.protobuf.InvalidProtocolBufferException; - -/** - * - */ -public class TextIndexCombiner extends TypedValueCombiner { - public static final Encoder TERMWEIGHT_INFO_ENCODER = new TermWeightInfoEncoder(); - - @Override - public TermWeight.Info typedReduce(Key key, Iterator iter) { - TermWeight.Info.Builder builder = TermWeight.Info.newBuilder(); - List offsets = new ArrayList(); - float normalizedTermFrequency = 0f; - - while (iter.hasNext()) { - TermWeight.Info info = iter.next(); - if (null == info) - continue; - - // Add each offset into the list maintaining sorted order - for (int offset : info.getWordOffsetList()) { - int pos = Collections.binarySearch(offsets, offset); - - if (pos < 0) { - // Undo the transform on the insertion point - offsets.add((-1 * pos) - 1, offset); - } else { - offsets.add(pos, offset); - } - } - - if (info.getNormalizedTermFrequency() > 0) { - normalizedTermFrequency += info.getNormalizedTermFrequency(); - } - } - - // Keep the sorted order we tried to maintain - for (int i = 0; i < offsets.size(); ++i) { - builder.addWordOffset(offsets.get(i)); - } - - builder.setNormalizedTermFrequency(normalizedTermFrequency); - return builder.build(); - } - - @Override - public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - setEncoder(TERMWEIGHT_INFO_ENCODER); - } - - public static class TermWeightInfoEncoder implements Encoder { - @Override - public byte[] encode(TermWeight.Info v) { - return v.toByteArray(); - } - - @Override - public TermWeight.Info decode(byte[] b) { - if (b.length == 0) - return null; - try { - return TermWeight.Info.parseFrom(b); - } catch (InvalidProtocolBufferException e) { - throw new ValueFormatException("Value passed to aggregator was not of type TermWeight.Info"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/LcNoDiacriticsNormalizer.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/LcNoDiacriticsNormalizer.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/LcNoDiacriticsNormalizer.java deleted file mode 100644 index b15a5ee..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/LcNoDiacriticsNormalizer.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.normalizer; - -import java.text.Normalizer; -import java.text.Normalizer.Form; -import java.util.Locale; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * An {@link Normalizer} which performs the following steps: - *
    - *
  1. Unicode canonical decomposition ({@link Form#NFD})
  2. - *
  3. Removal of diacritical marks
  4. - *
  5. Unicode canonical composition ({@link Form#NFC})
  6. - *
  7. lower casing in the {@link Locale#ENGLISH English local} - *
- */ -public class LcNoDiacriticsNormalizer implements org.apache.accumulo.examples.wikisearch.normalizer.Normalizer { - private static final Pattern diacriticals = Pattern.compile("\\p{InCombiningDiacriticalMarks}"); - - public String normalizeFieldValue(String fieldName, Object fieldValue) { - String decomposed = Normalizer.normalize(fieldValue.toString(), Form.NFD); - String noDiacriticals = removeDiacriticalMarks(decomposed); - String recomposed = Normalizer.normalize(noDiacriticals, Form.NFC); - return recomposed.toLowerCase(Locale.ENGLISH); - } - - private String removeDiacriticalMarks(String str) { - Matcher matcher = diacriticals.matcher(str); - return matcher.replaceAll(""); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NoOpNormalizer.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NoOpNormalizer.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NoOpNormalizer.java deleted file mode 100644 index 7498f76..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NoOpNormalizer.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.normalizer; - -public class NoOpNormalizer implements Normalizer { - public String normalizeFieldValue(String field, Object value) { - return value.toString(); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/Normalizer.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/Normalizer.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/Normalizer.java deleted file mode 100644 index ba3632a..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/Normalizer.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.normalizer; - -public interface Normalizer { - - /** - * Creates normalized content for ingest based upon implemented logic. - * - * @param field - * The field being normalized - * @param value - * The value to normalize - * @return a normalized value - */ - public String normalizeFieldValue(String field, Object value); - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NumberNormalizer.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NumberNormalizer.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NumberNormalizer.java deleted file mode 100644 index e0a5cc8..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/normalizer/NumberNormalizer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.normalizer; - -import org.apache.commons.lang.math.NumberUtils; -import org.apache.lucene.util.NumericUtils; - -public class NumberNormalizer implements Normalizer { - - public String normalizeFieldValue(String field, Object value) { - if (NumberUtils.isNumber(value.toString())) { - Number n = NumberUtils.createNumber(value.toString()); - if (n instanceof Integer) - return NumericUtils.intToPrefixCoded((Integer) n); - else if (n instanceof Long) - return NumericUtils.longToPrefixCoded((Long) n); - else if (n instanceof Float) - return NumericUtils.floatToPrefixCoded((Float) n); - else if (n instanceof Double) - return NumericUtils.doubleToPrefixCoded((Double) n); - else - throw new IllegalArgumentException("Unhandled numeric type: " + n.getClass()); - } else { - throw new IllegalArgumentException("Value is not a number: " + value); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java deleted file mode 100644 index 9b663de..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.output; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.TreeMap; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.data.ColumnUpdate; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.file.FileSKVWriter; -import org.apache.accumulo.core.file.rfile.RFileOperations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -final class BufferingRFileRecordWriter extends RecordWriter { - private final long maxSize; - private final AccumuloConfiguration acuconf; - private final Configuration conf; - private final String filenamePrefix; - private final String taskID; - private final FileSystem fs; - private int fileCount = 0; - private long size; - - private Map> buffers = new HashMap>(); - private Map bufferSizes = new HashMap(); - - private TreeMap getBuffer(Text tablename) { - TreeMap buffer = buffers.get(tablename); - if (buffer == null) { - buffer = new TreeMap(); - buffers.put(tablename, buffer); - bufferSizes.put(tablename, 0l); - } - return buffer; - } - - private Text getLargestTablename() { - long max = 0; - Text table = null; - for (Entry e : bufferSizes.entrySet()) { - if (e.getValue() > max) { - max = e.getValue(); - table = e.getKey(); - } - } - return table; - } - - private void flushLargestTable() throws IOException { - Text tablename = getLargestTablename(); - if (tablename == null) - return; - long bufferSize = bufferSizes.get(tablename); - TreeMap buffer = buffers.get(tablename); - if (buffer.size() == 0) - return; - - String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf"; - // TODO get the table configuration for the given table? - FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf); - - // forget locality groups for now, just write everything to the default - writer.startDefaultLocalityGroup(); - - for (Entry e : buffer.entrySet()) { - writer.append(e.getKey(), e.getValue()); - } - - writer.close(); - - size -= bufferSize; - buffer.clear(); - bufferSizes.put(tablename, 0l); - } - - BufferingRFileRecordWriter(long maxSize, AccumuloConfiguration acuconf, Configuration conf, String filenamePrefix, String taskID, FileSystem fs) { - this.maxSize = maxSize; - this.acuconf = acuconf; - this.conf = conf; - this.filenamePrefix = filenamePrefix; - this.taskID = taskID; - this.fs = fs; - } - - @Override - public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { - while (size > 0) - flushLargestTable(); - } - - @Override - public void write(Text table, Mutation mutation) throws IOException, InterruptedException { - TreeMap buffer = getBuffer(table); - int mutationSize = 0; - for (ColumnUpdate update : mutation.getUpdates()) { - Key k = new Key(mutation.getRow(), update.getColumnFamily(), update.getColumnQualifier(), update.getColumnVisibility(), update.getTimestamp(), - update.isDeleted()); - Value v = new Value(update.getValue()); - // TODO account for object overhead - mutationSize += k.getSize(); - mutationSize += v.getSize(); - buffer.put(k, v); - } - size += mutationSize; - long bufferSize = bufferSizes.get(table); - - // TODO use a MutableLong instead - bufferSize += mutationSize; - bufferSizes.put(table, bufferSize); - - while (size >= maxSize) { - flushLargestTable(); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java deleted file mode 100644 index 1fa8fdc..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.wikisearch.output; - -import java.io.IOException; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.data.Mutation; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -public class SortingRFileOutputFormat extends OutputFormat { - - // private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class); - - public static final String PATH_NAME = "sortingrfileoutputformat.path"; - public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size"; - - public static void setPathName(Configuration conf, String path) { - conf.set(PATH_NAME, path); - } - - public static String getPathName(Configuration conf) { - return conf.get(PATH_NAME); - } - - public static void setMaxBufferSize(Configuration conf, long maxBufferSize) { - conf.setLong(MAX_BUFFER_SIZE, maxBufferSize); - } - - public static long getMaxBufferSize(Configuration conf) { - return conf.getLong(MAX_BUFFER_SIZE, -1); - } - - @Override - public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException { - // TODO make sure the path is writable? - // TODO make sure the max buffer size is set and is reasonable - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException, InterruptedException { - return new OutputCommitter() { - - @Override - public void setupTask(TaskAttemptContext arg0) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void setupJob(JobContext arg0) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException { - // TODO Auto-generated method stub - return false; - } - - @Override - public void commitTask(TaskAttemptContext arg0) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void cleanupJob(JobContext arg0) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void abortTask(TaskAttemptContext arg0) throws IOException { - // TODO Auto-generated method stub - - } - }; - } - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext attempt) throws IOException, InterruptedException { - - // grab the configuration - final Configuration conf = attempt.getConfiguration(); - // create a filename - final String filenamePrefix = getPathName(conf); - final String taskID = attempt.getTaskAttemptID().toString(); - // grab the max size - final long maxSize = getMaxBufferSize(conf); - // grab the FileSystem - final FileSystem fs = FileSystem.get(conf); - // create a default AccumuloConfiguration - final AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration(); - - return new BufferingRFileRecordWriter(maxSize, acuconf, conf, filenamePrefix, taskID, fs); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/8db62992/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/protobuf/TermWeight.java ---------------------------------------------------------------------- diff --git a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/protobuf/TermWeight.java b/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/protobuf/TermWeight.java deleted file mode 100644 index bf5133f..0000000 --- a/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/protobuf/TermWeight.java +++ /dev/null @@ -1,424 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: TermWeight.proto - -package org.apache.accumulo.examples.wikisearch.protobuf; - -public final class TermWeight { - private TermWeight() {} - - public static void registerAllExtensions(com.google.protobuf.ExtensionRegistry registry) {} - - public static final class Info extends com.google.protobuf.GeneratedMessage { - // Use Info.newBuilder() to construct. - private Info() { - initFields(); - } - - private Info(boolean noInit) {} - - private static final Info defaultInstance; - - public static Info getDefaultInstance() { - return defaultInstance; - } - - public Info getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.internal_static_protobuf_Info_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.internal_static_protobuf_Info_fieldAccessorTable; - } - - // required float normalizedTermFrequency = 1; - public static final int NORMALIZEDTERMFREQUENCY_FIELD_NUMBER = 1; - private boolean hasNormalizedTermFrequency; - private float normalizedTermFrequency_ = 0F; - - public boolean hasNormalizedTermFrequency() { - return hasNormalizedTermFrequency; - } - - public float getNormalizedTermFrequency() { - return normalizedTermFrequency_; - } - - // repeated uint32 wordOffset = 2; - public static final int WORDOFFSET_FIELD_NUMBER = 2; - private java.util.List wordOffset_ = java.util.Collections.emptyList(); - - public java.util.List getWordOffsetList() { - return wordOffset_; - } - - public int getWordOffsetCount() { - return wordOffset_.size(); - } - - public int getWordOffset(int index) { - return wordOffset_.get(index); - } - - private void initFields() {} - - public final boolean isInitialized() { - if (!hasNormalizedTermFrequency) - return false; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { - getSerializedSize(); - if (hasNormalizedTermFrequency()) { - output.writeFloat(1, getNormalizedTermFrequency()); - } - for (int element : getWordOffsetList()) { - output.writeUInt32(2, element); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) - return size; - - size = 0; - if (hasNormalizedTermFrequency()) { - size += com.google.protobuf.CodedOutputStream.computeFloatSize(1, getNormalizedTermFrequency()); - } - { - int dataSize = 0; - for (int element : getWordOffsetList()) { - dataSize += com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(element); - } - size += dataSize; - size += 1 * getWordOffsetList().size(); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry).buildParsed(); - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry).buildParsed(); - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(java.io.InputStream input) throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry).buildParsed(); - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input)) { - return builder.buildParsed(); - } else { - return null; - } - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseDelimitedFrom(java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input, extensionRegistry)) { - return builder.buildParsed(); - } else { - return null; - } - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(com.google.protobuf.CodedInputStream input) throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - - public static org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info parseFrom(com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry).buildParsed(); - } - - public static Builder newBuilder() { - return Builder.create(); - } - - public Builder newBuilderForType() { - return newBuilder(); - } - - public static Builder newBuilder(org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info prototype) { - return newBuilder().mergeFrom(prototype); - } - - public Builder toBuilder() { - return newBuilder(this); - } - - public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info result; - - // Construct using protobuf.TermWeight.Info.newBuilder() - private Builder() {} - - private static Builder create() { - Builder builder = new Builder(); - builder.result = new org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info(); - return builder; - } - - protected org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info internalGetResult() { - return result; - } - - public Builder clear() { - if (result == null) { - throw new IllegalStateException("Cannot call clear() after build()."); - } - result = new org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info(); - return this; - } - - public Builder clone() { - return create().mergeFrom(result); - } - - public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info.getDescriptor(); - } - - public org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info getDefaultInstanceForType() { - return org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info.getDefaultInstance(); - } - - public boolean isInitialized() { - return result.isInitialized(); - } - - public org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info build() { - if (result != null && !isInitialized()) { - throw newUninitializedMessageException(result); - } - return buildPartial(); - } - - private org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { - if (!isInitialized()) { - throw newUninitializedMessageException(result).asInvalidProtocolBufferException(); - } - return buildPartial(); - } - - public org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info buildPartial() { - if (result == null) { - throw new IllegalStateException("build() has already been called on this Builder."); - } - if (result.wordOffset_ != java.util.Collections.EMPTY_LIST) { - result.wordOffset_ = java.util.Collections.unmodifiableList(result.wordOffset_); - } - org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info returnMe = result; - result = null; - return returnMe; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info) { - return mergeFrom((org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info) other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info other) { - if (other == org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info.getDefaultInstance()) - return this; - if (other.hasNormalizedTermFrequency()) { - setNormalizedTermFrequency(other.getNormalizedTermFrequency()); - } - if (!other.wordOffset_.isEmpty()) { - if (result.wordOffset_.isEmpty()) { - result.wordOffset_ = new java.util.ArrayList(); - } - result.wordOffset_.addAll(other.wordOffset_); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public Builder mergeFrom(com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(this.getUnknownFields()); - while (true) { - int tag = input.readTag(); - switch (tag) { - case 0: - this.setUnknownFields(unknownFields.build()); - return this; - default: { - if (!parseUnknownField(input, unknownFields, extensionRegistry, tag)) { - this.setUnknownFields(unknownFields.build()); - return this; - } - break; - } - case 13: { - setNormalizedTermFrequency(input.readFloat()); - break; - } - case 16: { - addWordOffset(input.readUInt32()); - break; - } - case 18: { - int length = input.readRawVarint32(); - int limit = input.pushLimit(length); - while (input.getBytesUntilLimit() > 0) { - addWordOffset(input.readUInt32()); - } - input.popLimit(limit); - break; - } - } - } - } - - // required float normalizedTermFrequency = 1; - public boolean hasNormalizedTermFrequency() { - return result.hasNormalizedTermFrequency(); - } - - public float getNormalizedTermFrequency() { - return result.getNormalizedTermFrequency(); - } - - public Builder setNormalizedTermFrequency(float value) { - result.hasNormalizedTermFrequency = true; - result.normalizedTermFrequency_ = value; - return this; - } - - public Builder clearNormalizedTermFrequency() { - result.hasNormalizedTermFrequency = false; - result.normalizedTermFrequency_ = 0F; - return this; - } - - // repeated uint32 wordOffset = 2; - public java.util.List getWordOffsetList() { - return java.util.Collections.unmodifiableList(result.wordOffset_); - } - - public int getWordOffsetCount() { - return result.getWordOffsetCount(); - } - - public int getWordOffset(int index) { - return result.getWordOffset(index); - } - - public Builder setWordOffset(int index, int value) { - result.wordOffset_.set(index, value); - return this; - } - - public Builder addWordOffset(int value) { - if (result.wordOffset_.isEmpty()) { - result.wordOffset_ = new java.util.ArrayList(); - } - result.wordOffset_.add(value); - return this; - } - - public Builder addAllWordOffset(java.lang.Iterable values) { - if (result.wordOffset_.isEmpty()) { - result.wordOffset_ = new java.util.ArrayList(); - } - super.addAll(values, result.wordOffset_); - return this; - } - - public Builder clearWordOffset() { - result.wordOffset_ = java.util.Collections.emptyList(); - return this; - } - - // @@protoc_insertion_point(builder_scope:protobuf.Info) - } - - static { - defaultInstance = new Info(true); - org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.internalForceInit(); - defaultInstance.initFields(); - } - - // @@protoc_insertion_point(class_scope:protobuf.Info) - } - - private static com.google.protobuf.Descriptors.Descriptor internal_static_protobuf_Info_descriptor; - private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_protobuf_Info_fieldAccessorTable; - - public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { - return descriptor; - } - - private static com.google.protobuf.Descriptors.FileDescriptor descriptor; - static { - java.lang.String[] descriptorData = {"\n\020TermWeight.proto\022\010protobuf\";\n\004Info\022\037\n\027" - + "normalizedTermFrequency\030\001 \002(\002\022\022\n\nwordOff" + "set\030\002 \003(\rB\014\n\010protobufH\001"}; - com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { - public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protobuf.Descriptors.FileDescriptor root) { - descriptor = root; - internal_static_protobuf_Info_descriptor = getDescriptor().getMessageTypes().get(0); - internal_static_protobuf_Info_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_protobuf_Info_descriptor, new java.lang.String[] {"NormalizedTermFrequency", "WordOffset",}, org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info.class, - org.apache.accumulo.examples.wikisearch.protobuf.TermWeight.Info.Builder.class); - return null; - } - }; - com.google.protobuf.Descriptors.FileDescriptor.internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] {}, - assigner); - } - - public static void internalForceInit() {} - - // @@protoc_insertion_point(outer_class_scope) -}