Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5C6E69AEC for ; Wed, 15 Feb 2012 21:31:24 +0000 (UTC) Received: (qmail 79247 invoked by uid 500); 15 Feb 2012 21:31:24 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 79216 invoked by uid 500); 15 Feb 2012 21:31:24 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 79209 invoked by uid 99); 15 Feb 2012 21:31:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Feb 2012 21:31:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Feb 2012 21:31:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B42D723888E4; Wed, 15 Feb 2012 21:30:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1244743 - in /incubator/accumulo/trunk: ./ conf/ src/core/ src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/ src/examples/wikisearch/ingest/conf/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wik... Date: Wed, 15 Feb 2012 21:30:58 -0000 To: accumulo-commits@incubator.apache.org From: vines@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120215213058.B42D723888E4@eris.apache.org> Author: vines Date: Wed Feb 15 21:30:57 2012 New Revision: 1244743 URL: http://svn.apache.org/viewvc?rev=1244743&view=rev Log: merging ACCUMULO-390 and what the otehrs have been working on Added: incubator/accumulo/trunk/conf/accumulo-env.sh.1GBstandalone-example - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.1GBstandalone-example incubator/accumulo/trunk/conf/accumulo-env.sh.2GBstandalone-example - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.2GBstandalone-example incubator/accumulo/trunk/conf/accumulo-env.sh.3GBcluster-example - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.3GBcluster-example incubator/accumulo/trunk/conf/accumulo-env.sh.512MBBstandalone-example - copied unchanged from r1244742, incubator/accumulo/branches/1.4/conf/accumulo-env.sh.512MBBstandalone-example incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/ - copied from r1244689, incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/ Removed: incubator/accumulo/trunk/conf/accumulo-env.sh.example Modified: incubator/accumulo/trunk/ (props changed) incubator/accumulo/trunk/README incubator/accumulo/trunk/src/core/ (props changed) incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java Propchange: incubator/accumulo/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Feb 15 21:30:57 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873 /incubator/accumulo/branches/1.3.5rc:1209938 -/incubator/accumulo/branches/1.4:1201902-1242521,1244690-1244693 +/incubator/accumulo/branches/1.4:1201902-1244742 Modified: incubator/accumulo/trunk/README URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/README?rev=1244743&r1=1244742&r2=1244743&view=diff ============================================================================== --- incubator/accumulo/trunk/README (original) +++ incubator/accumulo/trunk/README Wed Feb 15 21:30:57 2012 @@ -56,7 +56,7 @@ Create a "masters" file in $ACCUMULO_HOM machines where the master server will run. Create conf/accumulo-env.sh following the template of -conf/accumulo-env.sh.example. Set JAVA_HOME, HADOOP_HOME, and ZOOKEEPER_HOME. +conf/accumulo-env.sh.*-example. Set JAVA_HOME, HADOOP_HOME, and ZOOKEEPER_HOME.The example accumulo-env files are named based on the memory footprint for the accumulo processes, and if that footprint is during standalone or cluster use. Please note that the footprints are for only the Accumulo system processes, so ample space should be left for other processes like hadoop, zookeeper, and the accumulo client code. These directories must be at the same location on every node in the cluster. Note that zookeeper must be installed on every machine, but it should not be run on every machine. Propchange: incubator/accumulo/trunk/src/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Feb 15 21:30:57 2012 @@ -1,3 +1,3 @@ -/incubator/accumulo/branches/1.3.5rc/src/core:1209938 /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 -/incubator/accumulo/branches/1.4/src/core:1201902-1242521,1244690-1244693 +/incubator/accumulo/branches/1.3.5rc/src/core:1209938 +/incubator/accumulo/branches/1.4/src/core:1201902-1244742 Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java?rev=1244743&r1=1244742&r2=1244743&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java (original) +++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/client/mapreduce/InputFormatBase.java Wed Feb 15 21:30:57 2012 @@ -25,6 +25,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.math.BigInteger; +import java.net.InetAddress; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; @@ -571,8 +572,17 @@ public abstract class InputFormatBase>(); + HashMap hostNameCache = new HashMap(); + for (Entry>> tserverBin : binnedRanges.entrySet()) { - String location = tserverBin.getKey().split(":", 2)[0]; + String ip = tserverBin.getKey().split(":", 2)[0]; + String location = hostNameCache.get(ip); + if (location == null) { + InetAddress inetAddress = InetAddress.getByName(ip); + location = inetAddress.getHostName(); + hostNameCache.put(ip, location); + } + for (Entry> extentRanges : tserverBin.getValue().entrySet()) { Range ke = extentRanges.getKey().toDataRange(); for (Range r : extentRanges.getValue()) { Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example?rev=1244743&r1=1244742&r2=1244743&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/conf/wikipedia_parallel.xml.example Wed Feb 15 21:30:57 2012 @@ -56,4 +56,20 @@ wikipedia.run.ingest + + wikipedia.bulk.ingest + + + + wikipedia.bulk.ingest.dir + + + + wikipedia.bulk.ingest.failure.dir + + + + wikipedia.bulk.ingest.buffer.size + + Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java?rev=1244743&r1=1244742&r2=1244743&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaConfiguration.java Wed Feb 15 21:30:57 2012 @@ -52,6 +52,10 @@ public class WikipediaConfiguration { public final static String RUN_PARTITIONER = "wikipedia.run.partitioner"; public final static String RUN_INGEST = "wikipedia.run.ingest"; + public final static String BULK_INGEST = "wikipedia.bulk.ingest"; + public final static String BULK_INGEST_DIR = "wikipedia.bulk.ingest.dir"; + public final static String BULK_INGEST_FAILURE_DIR = "wikipedia.bulk.ingest.failure.dir"; + public final static String BULK_INGEST_BUFFER_SIZE = "wikipedia.bulk.ingest.buffer.size"; public static String getUser(Configuration conf) { @@ -134,6 +138,22 @@ public class WikipediaConfiguration { return conf.getBoolean(RUN_INGEST, true); } + public static boolean bulkIngest(Configuration conf) { + return conf.getBoolean(BULK_INGEST, true); + } + + public static String bulkIngestDir(Configuration conf) { + return conf.get(BULK_INGEST_DIR); + } + + public static String bulkIngestFailureDir(Configuration conf) { + return conf.get(BULK_INGEST_FAILURE_DIR); + } + + public static long bulkIngestBufferSize(Configuration conf) { + return conf.getLong(BULK_INGEST_BUFFER_SIZE,1l<<28); + } + /** * Helper method to get properties from Hadoop configuration * @@ -169,5 +189,5 @@ public class WikipediaConfiguration { throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled."); } - + } Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java?rev=1244743&r1=1244742&r2=1244743&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java Wed Feb 15 21:30:57 2012 @@ -39,9 +39,11 @@ import org.apache.accumulo.core.client.m import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner; import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner; +import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat; import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -53,14 +55,16 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; public class WikipediaPartitionedIngester extends Configured implements Tool { - + + private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class); + public final static String INGEST_LANGUAGE = "wikipedia.ingest_language"; public final static String SPLIT_FILE = "wikipedia.split_file"; public final static String TABLE_NAME = "wikipedia.table"; @@ -140,11 +144,17 @@ public class WikipediaPartitionedIngeste return result; } if(WikipediaConfiguration.runIngest(conf)) - return runIngestJob(); + { + int result = runIngestJob(); + if(result != 0) + return result; + if(WikipediaConfiguration.bulkIngest(conf)) + return loadBulkFiles(); + } return 0; } - public int runPartitionerJob() throws Exception + private int runPartitionerJob() throws Exception { Job partitionerJob = new Job(getConf(), "Partition Wikipedia"); Configuration partitionerConf = partitionerJob.getConfiguration(); @@ -185,7 +195,7 @@ public class WikipediaPartitionedIngeste return partitionerJob.waitForCompletion(true) ? 0 : 1; } - public int runIngestJob() throws Exception + private int runIngestJob() throws Exception { Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia"); Configuration ingestConf = ingestJob.getConfiguration(); @@ -195,11 +205,6 @@ public class WikipediaPartitionedIngeste String tablename = WikipediaConfiguration.getTableName(ingestConf); - String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); - String instanceName = WikipediaConfiguration.getInstanceName(ingestConf); - - String user = WikipediaConfiguration.getUser(ingestConf); - byte[] password = WikipediaConfiguration.getPassword(ingestConf); Connector connector = WikipediaConfiguration.getConnector(ingestConf); TableOperations tops = connector.tableOperations(); @@ -217,13 +222,55 @@ public class WikipediaPartitionedIngeste // setup output format ingestJob.setMapOutputKeyClass(Text.class); ingestJob.setMapOutputValueClass(Mutation.class); - ingestJob.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename); - AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers); + + if(WikipediaConfiguration.bulkIngest(ingestConf)) + { + ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class); + SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf)); + String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf); + if(bulkIngestDir == null) + { + log.error("Bulk ingest dir not set"); + return 1; + } + SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf)); + } else { + ingestJob.setOutputFormatClass(AccumuloOutputFormat.class); + String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf); + String instanceName = WikipediaConfiguration.getInstanceName(ingestConf); + String user = WikipediaConfiguration.getUser(ingestConf); + byte[] password = WikipediaConfiguration.getPassword(ingestConf); + AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename); + AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers); + } return ingestJob.waitForCompletion(true) ? 0 : 1; } + private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException + { + Configuration conf = getConf(); + + Connector connector = WikipediaConfiguration.getConnector(conf); + + FileSystem fs = FileSystem.get(conf); + String directory = WikipediaConfiguration.bulkIngestDir(conf); + + String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf); + + for(FileStatus status: fs.listStatus(new Path(directory))) + { + if(status.isDir() == false) + continue; + Path dir = status.getPath(); + Path failPath = new Path(failureDirectory+"/"+dir.getName()); + fs.mkdirs(failPath); + connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(), true); + } + + return 0; + } + public final static PathFilter partFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -241,7 +288,6 @@ public class WikipediaPartitionedIngeste protected void configureIngestJob(Job job) { job.setJarByClass(WikipediaPartitionedIngester.class); - job.setInputFormatClass(WikipediaInputFormat.class); } protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");