incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1244743 - in /incubator/accumulo/trunk: ./ conf/ src/core/ src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/ src/examples/wikisearch/ingest/conf/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wik...
Date Wed, 15 Feb 2012 21:30:58 GMT
Author: vines
Date: Wed Feb 15 21:30:57 2012
New Revision: 1244743

URL: http://svn.apache.org/viewvc?rev=1244743&view=rev
Log:
merging ACCUMULO-390 and what the otehrs have been working on


Added:
    incubator/accumulo/trunk/conf/accumulo-env.sh.1GBstandalone-example
      - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.1GBstandalone-example
    incubator/accumulo/trunk/conf/accumulo-env.sh.2GBstandalone-example
      - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.2GBstandalone-example
    incubator/accumulo/trunk/conf/accumulo-env.sh.3GBcluster-example
      - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.3GBcluster-example
    incubator/accumulo/trunk/conf/accumulo-env.sh.512MBBstandalone-example
      - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.512MBBstandalone-example
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/
      - copied from r1244689, incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/
Removed:
    incubator/accumulo/trunk/conf/accumulo-env.sh.example
Modified:
    incubator/accumulo/trunk/   (props changed)
    incubator/accumulo/trunk/README
    incubator/accumulo/trunk/src/core/   (props changed)
    incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
    incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java

Propchange: incubator/accumulo/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 15 21:30:57 2012
@@ -1,3 +1,3 @@
 /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873
 /incubator/accumulo/branches/1.3.5rc:1209938
-/incubator/accumulo/branches/1.4:1201902-1242521,1244690-1244693
+/incubator/accumulo/branches/1.4:1201902-1244742

Modified: incubator/accumulo/trunk/README
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/README?rev=1244743&r1=1244742&r2=1244743&view=diff
==============================================================================
--- incubator/accumulo/trunk/README (original)
+++ incubator/accumulo/trunk/README Wed Feb 15 21:30:57 2012
@@ -56,7 +56,7 @@ Create a "masters" file in $ACCUMULO_HOM
 machines where the master server will run. 
 
 Create conf/accumulo-env.sh following the template of
-conf/accumulo-env.sh.example.  Set JAVA_HOME, HADOOP_HOME, and ZOOKEEPER_HOME.
+conf/accumulo-env.sh.*-example.  Set JAVA_HOME, HADOOP_HOME, and ZOOKEEPER_HOME.The example
accumulo-env files are named based on the memory footprint for the accumulo processes, and
if that footprint is during standalone or cluster use. Please note that the footprints are
for only the Accumulo system processes, so ample space should be left for other processes
like hadoop, zookeeper, and the accumulo client code.
 These directories must be at the same location on every node in the cluster.
 Note that zookeeper must be installed on every machine, but it should not be 
 run on every machine.

Propchange: incubator/accumulo/trunk/src/core/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 15 21:30:57 2012
@@ -1,3 +1,3 @@
-/incubator/accumulo/branches/1.3.5rc/src/core:1209938
 /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215
-/incubator/accumulo/branches/1.4/src/core:1201902-1242521,1244690-1244693
+/incubator/accumulo/branches/1.3.5rc/src/core:1209938
+/incubator/accumulo/branches/1.4/src/core:1201902-1244742

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1244743&r1=1244742&r2=1244743&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
(original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java
Wed Feb 15 21:30:57 2012
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
+import java.net.InetAddress;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.nio.ByteBuffer;
@@ -571,8 +572,17 @@ public abstract class InputFormatBase<K,
     if (!autoAdjust)
       splitsToAdd = new HashMap<Range,ArrayList<String>>();
     
+    HashMap<String,String> hostNameCache = new HashMap<String,String>();
+
     for (Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet())
{
-      String location = tserverBin.getKey().split(":", 2)[0];
+      String ip = tserverBin.getKey().split(":", 2)[0];
+      String location = hostNameCache.get(ip);
+      if (location == null) {
+        InetAddress inetAddress = InetAddress.getByName(ip);
+        location = inetAddress.getHostName();
+        hostNameCache.put(ip, location);
+      }
+
       for (Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet())
{
         Range ke = extentRanges.getKey().toDataRange();
         for (Range r : extentRanges.getValue()) {

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example?rev=1244743&r1=1244742&r2=1244743&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example
Wed Feb 15 21:30:57 2012
@@ -56,4 +56,20 @@
     <name>wikipedia.run.ingest</name>
     <value><!--whether to run the ingest step --></value>
   </property>
+  <property>
+    <name>wikipedia.bulk.ingest</name>
+    <value><!--whether to use bulk ingest vice streaming ingest --></value>
+  </property>
+  <property>
+    <name>wikipedia.bulk.ingest.dir</name>
+    <value><!--the directory to store rfiles for bulk ingest --></value>
+  </property>
+  <property>
+    <name>wikipedia.bulk.ingest.failure.dir</name>
+    <value><!--the directory to store failed rfiles after bulk ingest --></value>
+  </property>
+  <property>
+    <name>wikipedia.bulk.ingest.buffer.size</name>
+    <value><!--the ammount of memory to use for buffering and sorting key/value
pairs in each mapper before writing rfiles --></value>
+  </property>
 </configuration>

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1244743&r1=1244742&r2=1244743&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java
Wed Feb 15 21:30:57 2012
@@ -52,6 +52,10 @@ public class WikipediaConfiguration {
   
   public final static String RUN_PARTITIONER = "wikipedia.run.partitioner";
   public final static String RUN_INGEST = "wikipedia.run.ingest";
+  public final static String BULK_INGEST = "wikipedia.bulk.ingest";
+  public final static String BULK_INGEST_DIR = "wikipedia.bulk.ingest.dir";
+  public final static String BULK_INGEST_FAILURE_DIR = "wikipedia.bulk.ingest.failure.dir";
+  public final static String BULK_INGEST_BUFFER_SIZE = "wikipedia.bulk.ingest.buffer.size";
   
   
   public static String getUser(Configuration conf) {
@@ -134,6 +138,22 @@ public class WikipediaConfiguration {
     return conf.getBoolean(RUN_INGEST, true);
   }
 
+  public static boolean bulkIngest(Configuration conf) {
+    return conf.getBoolean(BULK_INGEST, true);
+  }
+
+  public static String bulkIngestDir(Configuration conf) {
+    return conf.get(BULK_INGEST_DIR);
+  }
+
+  public static String bulkIngestFailureDir(Configuration conf) {
+    return conf.get(BULK_INGEST_FAILURE_DIR);
+  }
+  
+  public static long bulkIngestBufferSize(Configuration conf) {
+    return conf.getLong(BULK_INGEST_BUFFER_SIZE,1l<<28);
+  }
+
   /**
    * Helper method to get properties from Hadoop configuration
    * 
@@ -169,5 +189,5 @@ public class WikipediaConfiguration {
       throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");
     
   }
-  
+
 }

Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java?rev=1244743&r1=1244742&r2=1244743&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
(original)
+++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
Wed Feb 15 21:30:57 2012
@@ -39,9 +39,11 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
 import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
+import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat;
 import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -53,14 +55,16 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
 
 public class WikipediaPartitionedIngester extends Configured implements Tool {
-  
+
+  private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class);
+
   public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
   public final static String SPLIT_FILE = "wikipedia.split_file";
   public final static String TABLE_NAME = "wikipedia.table";
@@ -140,11 +144,17 @@ public class WikipediaPartitionedIngeste
         return result;
     }
     if(WikipediaConfiguration.runIngest(conf))
-      return runIngestJob();
+    {
+      int result = runIngestJob();
+      if(result != 0)
+        return result;
+      if(WikipediaConfiguration.bulkIngest(conf))
+        return loadBulkFiles();
+    }
     return 0;
   }
   
-  public int runPartitionerJob() throws Exception
+  private int runPartitionerJob() throws Exception
   {
     Job partitionerJob = new Job(getConf(), "Partition Wikipedia");
     Configuration partitionerConf = partitionerJob.getConfiguration();
@@ -185,7 +195,7 @@ public class WikipediaPartitionedIngeste
     return partitionerJob.waitForCompletion(true) ? 0 : 1;
   }
   
-  public int runIngestJob() throws Exception
+  private int runIngestJob() throws Exception
   {
     Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia");
     Configuration ingestConf = ingestJob.getConfiguration();
@@ -195,11 +205,6 @@ public class WikipediaPartitionedIngeste
     
     String tablename = WikipediaConfiguration.getTableName(ingestConf);
     
-    String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
-    String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
-    
-    String user = WikipediaConfiguration.getUser(ingestConf);
-    byte[] password = WikipediaConfiguration.getPassword(ingestConf);
     Connector connector = WikipediaConfiguration.getConnector(ingestConf);
     
     TableOperations tops = connector.tableOperations();
@@ -217,13 +222,55 @@ public class WikipediaPartitionedIngeste
     // setup output format
     ingestJob.setMapOutputKeyClass(Text.class);
     ingestJob.setMapOutputValueClass(Mutation.class);
-    ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
-    AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true,
tablename);
-    AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName,
zookeepers);
+    
+    if(WikipediaConfiguration.bulkIngest(ingestConf))
+    {
+      ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
+      SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
+      String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf);
+      if(bulkIngestDir == null)
+      {
+        log.error("Bulk ingest dir not set");
+        return 1;
+      }
+      SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
+    } else {
+      ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
+      String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
+      String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
+      String user = WikipediaConfiguration.getUser(ingestConf);
+      byte[] password = WikipediaConfiguration.getPassword(ingestConf);
+      AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true,
tablename);
+      AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName,
zookeepers);
+    }
     
     return ingestJob.waitForCompletion(true) ? 0 : 1;
   }
   
+  private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException,
TableNotFoundException
+  {
+    Configuration conf = getConf();
+
+    Connector connector = WikipediaConfiguration.getConnector(conf);
+    
+    FileSystem fs = FileSystem.get(conf);
+    String directory = WikipediaConfiguration.bulkIngestDir(conf);
+    
+    String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf);
+    
+    for(FileStatus status: fs.listStatus(new Path(directory)))
+    {
+      if(status.isDir() == false)
+        continue;
+      Path dir = status.getPath();
+      Path failPath = new Path(failureDirectory+"/"+dir.getName());
+      fs.mkdirs(failPath);
+      connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(),
true);
+    }
+    
+    return 0;
+  }
+  
   public final static PathFilter partFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
@@ -241,7 +288,6 @@ public class WikipediaPartitionedIngeste
 
   protected void configureIngestJob(Job job) {
     job.setJarByClass(WikipediaPartitionedIngester.class);
-    job.setInputFormatClass(WikipediaInputFormat.class);
   }
   
   protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");



Mime
View raw message