accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From afu...@apache.org
Subject svn commit: r1243961 - in /incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch: ingest/WikipediaPartitionedIngester.java output/SortingRFileOutputFormat.java
Date Tue, 14 Feb 2012 14:46:37 GMT
Author: afuchs
Date: Tue Feb 14 14:46:37 2012
New Revision: 1243961

URL: http://svn.apache.org/viewvc?rev=1243961&view=rev
Log:
ACCUMULO-375

Modified:
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java?rev=1243961&r1=1243960&r2=1243961&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
Tue Feb 14 14:46:37 2012
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
 import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
@@ -58,9 +59,12 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
 
 public class WikipediaPartitionedIngester extends Configured implements Tool {
-  
+
+  private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class);
+
   public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
   public final static String SPLIT_FILE = "wikipedia.split_file";
   public final static String TABLE_NAME = "wikipedia.table";
@@ -150,7 +154,7 @@ public class WikipediaPartitionedIngeste
     return 0;
   }
   
-  public int runPartitionerJob() throws Exception
+  private int runPartitionerJob() throws Exception
   {
     Job partitionerJob = new Job(getConf(), "Partition Wikipedia");
     Configuration partitionerConf = partitionerJob.getConfiguration();
@@ -191,7 +195,7 @@ public class WikipediaPartitionedIngeste
     return partitionerJob.waitForCompletion(true) ? 0 : 1;
   }
   
-  public int runIngestJob() throws Exception
+  private int runIngestJob() throws Exception
   {
     Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia");
     Configuration ingestConf = ingestJob.getConfiguration();
@@ -221,6 +225,16 @@ public class WikipediaPartitionedIngeste
     
     if(WikipediaConfiguration.bulkIngest(ingestConf))
     {
+      ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
+      SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
+      String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf);
+      if(bulkIngestDir == null)
+      {
+        log.error("Bulk ingest dir not set");
+        return 1;
+      }
+      SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
+    } else {
       ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
       String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
       String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
@@ -228,16 +242,12 @@ public class WikipediaPartitionedIngeste
       byte[] password = WikipediaConfiguration.getPassword(ingestConf);
       AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true,
tablename);
       AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName,
zookeepers);
-    } else {
-      ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
-      SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
-      SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
     }
     
     return ingestJob.waitForCompletion(true) ? 0 : 1;
   }
   
-  public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException,
TableNotFoundException
+  private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException,
TableNotFoundException
   {
     Configuration conf = getConf();
 
@@ -253,7 +263,9 @@ public class WikipediaPartitionedIngeste
       if(status.isDir() == false)
         continue;
       Path dir = status.getPath();
-      connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(),
true);
+      Path failPath = new Path(failureDirectory+"/"+dir.getName());
+      fs.mkdirs(failPath);
+      connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(),
true);
     }
     
     return 0;

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java?rev=1243961&r1=1243960&r2=1243961&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
Tue Feb 14 14:46:37 2012
@@ -4,6 +4,7 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -12,9 +13,12 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
 
 public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {
-  
+
+  private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);
+
   public static final String PATH_NAME = "sortingrfileoutputformat.path";
   public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";
   



Mime
View raw message