accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ujustgotbi...@apache.org
Subject [26/50] git commit: ACCUMULO-471 document the ability to run run over uncompressed data; allow the input to be split, don't send millions of duplicate metadata table entries
Date Thu, 06 Feb 2014 05:40:05 GMT
ACCUMULO-471 document the ability to run run over uncompressed data; allow the input to be
split, don't send millions of duplicate metadata table entries

git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1302537 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/commit/2c1666fd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/tree/2c1666fd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/diff/2c1666fd

Branch: refs/heads/1.4.5-SNAPSHOT
Commit: 2c1666fd4fc9d696d612de2aac922f26e9d7116f
Parents: 66bb45c
Author: Eric C. Newton <ecn@apache.org>
Authored: Mon Mar 19 16:47:41 2012 +0000
Committer: Eric C. Newton <ecn@apache.org>
Committed: Mon Mar 19 16:47:41 2012 +0000

----------------------------------------------------------------------
 README                                          |  7 ++++--
 .../wikisearch/ingest/WikipediaInputFormat.java |  6 -----
 .../wikisearch/ingest/WikipediaMapper.java      | 23 ++++++++++++++------
 3 files changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/2c1666fd/README
----------------------------------------------------------------------
diff --git a/README b/README
index daec8e4..43077a7 100644
--- a/README
+++ b/README
@@ -11,7 +11,10 @@
  	1. Accumulo, Hadoop, and ZooKeeper must be installed and running
  	2. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed
in an HDFS directory.
 	   You will want to grab the files with the link name of pages-articles.xml.bz2
- 
+        3. Though not strictly required, the ingest will go more quickly if the files are
decompressed:
+
+            $ bunzip2 < enwiki-*-pages-articles.xml.bz2 | hadoop fs -put - /wikipedia/enwiki-pages-articles.xml
+
  
  	INSTRUCTIONS
  	------------
@@ -70,4 +73,4 @@
 	log4j.logger.org.apache.accumulo.examples.wikisearch.iterator=INFO,A1
 	
 	This needs to be propagated to all the tablet server nodes, and accumulo needs to be restarted.
-	
\ No newline at end of file
+	

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/2c1666fd/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
index e682f2f..c582cbf 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java
@@ -133,10 +133,4 @@ public class WikipediaInputFormat extends TextInputFormat {
   public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext
context) {
     return new AggregatingRecordReader();
   }
-  
-  @Override
-  protected boolean isSplitable(JobContext context, Path file) {
-    return false;
-  }
-  
 }

http://git-wip-us.apache.org/repos/asf/accumulo-wikisearch/blob/2c1666fd/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
----------------------------------------------------------------------
diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
index fc328cc..8565b09 100644
--- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
+++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java
@@ -119,6 +119,8 @@ public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation>
{
     return article.getId() % numPartitions;
   }
   
+  static HashSet<String> metadataSent = new HashSet<String>();
+
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
     Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()),
UTF8));
@@ -137,9 +139,13 @@ public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation>
{
       for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
         m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(),
cv, article.getTimestamp(), NULL_VALUE);
         // Create mutations for the metadata table.
-        Mutation mm = new Mutation(entry.getKey());
-        mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
-        context.write(metadataTableName, mm);
+        String metadataKey = entry.getKey() + METADATA_EVENT_COLUMN_FAMILY + language;
+        if (!metadataSent.contains(metadataKey)) {
+          Mutation mm = new Mutation(entry.getKey());
+          mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
+          context.write(metadataTableName, mm);
+          metadataSent.add(metadataKey);
+        }
       }
       
       // Tokenize the content
@@ -182,10 +188,13 @@ public class WikipediaMapper extends Mapper<LongWritable,Text,Text,Mutation>
{
         context.write(reverseIndexTableName, grm);
         
         // Create mutations for the metadata table.
-        Mutation mm = new Mutation(index.getKey());
-        mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(),
cv, article.getTimestamp(), NULL_VALUE);
-        context.write(metadataTableName, mm);
-        
+        String metadataKey = index.getKey() + METADATA_INDEX_COLUMN_FAMILY + language;
+        if (!metadataSent.contains(metadataKey)) {
+          Mutation mm = new Mutation(index.getKey());
+          mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(),
cv, article.getTimestamp(), NULL_VALUE);
+          context.write(metadataTableName, mm);
+          metadataSent.add(metadataKey);
+        }
       }
       // Add the entire text to the document section of the table.
       // row is the partition, colf is 'd', colq is language\0articleid, value is Base64
encoded GZIP'd document


Mime
View raw message