Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9645E9C3B for ; Mon, 13 Feb 2012 13:49:36 +0000 (UTC) Received: (qmail 74220 invoked by uid 500); 13 Feb 2012 13:49:36 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 74186 invoked by uid 500); 13 Feb 2012 13:49:35 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 74178 invoked by uid 99); 13 Feb 2012 13:49:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Feb 2012 13:49:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Feb 2012 13:49:33 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 91ABA2388865; Mon, 13 Feb 2012 13:49:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1243506 - in /incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest: conf/ src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ src/main/java/org/apache/accumulo/examples/wikisearch/output/ Date: Mon, 13 Feb 2012 13:49:13 -0000 To: accumulo-commits@incubator.apache.org From: afuchs@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120213134913.91ABA2388865@eris.apache.org> Author: afuchs Date: Mon Feb 13 13:49:12 2012 New Revision: 1243506 URL: http://svn.apache.org/viewvc?rev=1243506&view=rev Log: ACCUMULO-381 added a bulk ingest option for wikisearch ingest Added: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example?rev=1243506&r1=1243505&r2=1243506&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example Mon Feb 13 13:49:12 2012 @@ -56,4 +56,20 @@ wikipedia.run.ingest + + wikipedia.bulk.ingest + + + + wikipedia.bulk.ingest.dir + + + + wikipedia.bulk.ingest.failure.dir + + + + wikipedia.bulk.ingest.buffer.size + + Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1243506&r1=1243505&r2=1243506&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java Mon Feb 13 13:49:12 2012 @@ -52,6 +52,10 @@ public class WikipediaConfiguration { public final static String RUN_PARTITIONER = "wikipedia.run.partitioner"; public final static String RUN_INGEST = "wikipedia.run.ingest"; + public final static String BULK_INGEST = "wikipedia.bulk.ingest"; + public final static String BULK_INGEST_DIR = "wikipedia.bulk.ingest.dir"; + public final static String BULK_INGEST_FAILURE_DIR = "wikipedia.bulk.ingest.failure.dir"; + public final static String BULK_INGEST_BUFFER_SIZE = "wikipedia.bulk.ingest.buffer.size"; public static String getUser(Configuration conf) { @@ -134,6 +138,22 @@ public class WikipediaConfiguration { return conf.getBoolean(RUN_INGEST, true); } + public static boolean bulkIngest(Configuration conf) { + return conf.getBoolean(BULK_INGEST, true); + } + + public static String bulkIngestDir(Configuration conf) { + return conf.get(BULK_INGEST_DIR); + } + + public static String bulkIngestFailureDir(Configuration conf) { + return conf.get(BULK_INGEST_FAILURE_DIR); + } + + public static long bulkIngestBufferSize(Configuration conf) { + return conf.getLong(BULK_INGEST_BUFFER_SIZE,1l<<28); + } + /** * Helper method to get properties from Hadoop configuration * @@ -169,5 +189,5 @@ public class WikipediaConfiguration { throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled."); } - + } Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java?rev=1243506&r1=1243505&r2=1243506&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java Mon Feb 13 13:49:12 2012 @@ -42,6 +42,7 @@ import org.apache.accumulo.core.iterator import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner; +import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -53,7 +54,6 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; @@ -140,7 +140,13 @@ public class WikipediaPartitionedIngeste return result; } if(WikipediaConfiguration.runIngest(conf)) - return runIngestJob(); + { + int result = runIngestJob(); + if(result != 0) + return result; + if(WikipediaConfiguration.bulkIngest(conf)) + return loadBulkFiles(); + } return 0; } @@ -195,11 +201,6 @@ public class WikipediaPartitionedIngeste String tablename = WikipediaConfiguration.getTableName(ingestConf); - String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); - String instanceName = WikipediaConfiguration.getInstanceName(ingestConf); - - String user = WikipediaConfiguration.getUser(ingestConf); - byte[] password = WikipediaConfiguration.getPassword(ingestConf); Connector connector = WikipediaConfiguration.getConnector(ingestConf); TableOperations tops = connector.tableOperations(); @@ -217,13 +218,47 @@ public class WikipediaPartitionedIngeste // setup output format ingestJob.setMapOutputKeyClass(Text.class); ingestJob.setMapOutputValueClass(Mutation.class); - ingestJob.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename); - AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers); + + if(WikipediaConfiguration.bulkIngest(ingestConf)) + { + ingestJob.setOutputFormatClass(AccumuloOutputFormat.class); + String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); + String instanceName = WikipediaConfiguration.getInstanceName(ingestConf); + String user = WikipediaConfiguration.getUser(ingestConf); + byte[] password = WikipediaConfiguration.getPassword(ingestConf); + AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename); + AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers); + } else { + ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class); + SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf)); + SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf)); + } return ingestJob.waitForCompletion(true) ? 0 : 1; } + public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException + { + Configuration conf = getConf(); + + Connector connector = WikipediaConfiguration.getConnector(conf); + + FileSystem fs = FileSystem.get(conf); + String directory = WikipediaConfiguration.bulkIngestDir(conf); + + String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf); + + for(FileStatus status: fs.listStatus(new Path(directory))) + { + if(status.isDir() == false) + continue; + Path dir = status.getPath(); + connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true); + } + + return 0; + } + public final static PathFilter partFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -241,7 +276,6 @@ public class WikipediaPartitionedIngeste protected void configureIngestJob(Job job) { job.setJarByClass(WikipediaPartitionedIngester.class); - job.setInputFormatClass(WikipediaInputFormat.class); } protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?"); Added: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java?rev=1243506&view=auto ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java (added) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/BufferingRFileRecordWriter.java Mon Feb 13 13:49:12 2012 @@ -0,0 +1,129 @@ +package org.apache.accumulo.examples.wikisearch.output; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFileOperations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +final class BufferingRFileRecordWriter extends RecordWriter { + private final long maxSize; + private final AccumuloConfiguration acuconf; + private final Configuration conf; + private final String filenamePrefix; + private final String taskID; + private final FileSystem fs; + private int fileCount = 0; + private long size; + + private Map> buffers = new HashMap>(); + private Map bufferSizes = new HashMap(); + + private TreeMap getBuffer(Text tablename) + { + TreeMap buffer = buffers.get(tablename); + if(buffer == null) + { + buffer = new TreeMap(); + buffers.put(tablename, buffer); + bufferSizes.put(tablename, 0l); + } + return buffer; + } + + private Text getLargestTablename() + { + long max = 0; + Text table = null; + for(Entry e:bufferSizes.entrySet()) + { + if(e.getValue() > max) + { + max = e.getValue(); + table = e.getKey(); + } + } + return table; + } + + private void flushLargestTable() throws IOException + { + Text tablename = getLargestTablename(); + if(tablename == null) + return; + long bufferSize = bufferSizes.get(tablename); + TreeMap buffer = buffers.get(tablename); + if (buffer.size() == 0) + return; + + // TODO fix the filename + String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf"; + FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf); + + // forget locality groups for now, just write everything to the default + writer.startDefaultLocalityGroup(); + + for (Entry e : buffer.entrySet()) { + writer.append(e.getKey(), e.getValue()); + } + + writer.close(); + + size -= bufferSize; + buffer.clear(); + bufferSizes.put(tablename, 0l); + } + + BufferingRFileRecordWriter(long maxSize, AccumuloConfiguration acuconf, Configuration conf, String filenamePrefix, String taskID, FileSystem fs) { + this.maxSize = maxSize; + this.acuconf = acuconf; + this.conf = conf; + this.filenamePrefix = filenamePrefix; + this.taskID = taskID; + this.fs = fs; + } + + @Override + public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { + while(size > 0) + flushLargestTable(); + } + + @Override + public void write(Text table, Mutation mutation) throws IOException, InterruptedException { + TreeMap buffer = getBuffer(table); + int mutationSize = 0; + for(ColumnUpdate update: mutation.getUpdates()) + { + Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted()); + Value v = new Value(update.getValue()); + mutationSize += k.getSize(); + mutationSize += v.getSize(); + buffer.put(k, v); + } + size += mutationSize; + long bufferSize = bufferSizes.get(table); + bufferSize += mutationSize; + bufferSizes.put(table, bufferSize); + + // TODO add object overhead size + + while (size >= maxSize) { + flushLargestTable(); + } + } + +} Added: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java?rev=1243506&view=auto ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java (added) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java Mon Feb 13 13:49:12 2012 @@ -0,0 +1,103 @@ +package org.apache.accumulo.examples.wikisearch.output; + +import java.io.IOException; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; + +public class SortingRFileOutputFormat extends OutputFormat { + + public static final String PATH_NAME = "sortingrfileoutputformat.path"; + public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size"; + + public static void setPathName(Configuration conf, String path) { + conf.set(PATH_NAME, path); + } + + public static String getPathName(Configuration conf) { + return conf.get(PATH_NAME); + } + + public static void setMaxBufferSize(Configuration conf, long maxBufferSize) { + conf.setLong(MAX_BUFFER_SIZE, maxBufferSize); + } + + public static long getMaxBufferSize(Configuration conf) { + return conf.getLong(MAX_BUFFER_SIZE, -1); + } + + @Override + public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException { + // TODO make sure the path is writable? + // TODO make sure the max buffer size is set and is reasonable + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException, InterruptedException { + return new OutputCommitter() { + + @Override + public void setupTask(TaskAttemptContext arg0) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void setupJob(JobContext arg0) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void commitTask(TaskAttemptContext arg0) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void cleanupJob(JobContext arg0) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void abortTask(TaskAttemptContext arg0) throws IOException { + // TODO Auto-generated method stub + + } + }; + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext attempt) throws IOException, InterruptedException { + + // grab the configuration + final Configuration conf = attempt.getConfiguration(); + // create a filename + final String filenamePrefix = getPathName(conf); + final String taskID = attempt.getTaskAttemptID().toString(); + // grab the max size + final long maxSize = getMaxBufferSize(conf); + // grab the FileSystem + final FileSystem fs = FileSystem.get(conf); + // create a default AccumuloConfiguration + final AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration(); + + return new BufferingRFileRecordWriter(maxSize, acuconf, conf, filenamePrefix, taskID, fs); + } + +}