Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3451695B6 for ; Fri, 17 Feb 2012 16:12:54 +0000 (UTC) Received: (qmail 26575 invoked by uid 500); 17 Feb 2012 16:12:54 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 26553 invoked by uid 500); 17 Feb 2012 16:12:54 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 26546 invoked by uid 99); 17 Feb 2012 16:12:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 16:12:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 16:12:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 814FA23889B8; Fri, 17 Feb 2012 16:12:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245634 - in /incubator/accumulo/trunk: ./ src/core/ src/core/src/main/java/org/apache/accumulo/core/conf/ src/core/src/main/java/org/apache/accumulo/core/iterators/ src/core/src/test/java/org/apache/accumulo/core/iterators/user/ src/examp... Date: Fri, 17 Feb 2012 16:12:25 -0000 To: accumulo-commits@incubator.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120217161226.814FA23889B8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ecn Date: Fri Feb 17 16:12:25 2012 New Revision: 1245634 URL: http://svn.apache.org/viewvc?rev=1245634&view=rev Log: ACCUMULO-412 merge to trunk Modified: incubator/accumulo/trunk/ (props changed) incubator/accumulo/trunk/src/core/ (props changed) incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/iterators/user/FilterTest.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java incubator/accumulo/trunk/src/server/ (props changed) incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java Propchange: incubator/accumulo/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Feb 17 16:12:25 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611,1228195,1230180,1230736,1231043,1236873 /incubator/accumulo/branches/1.3.5rc:1209938 -/incubator/accumulo/branches/1.4:1201902-1245229 +/incubator/accumulo/branches/1.4:1201902-1245631 Propchange: incubator/accumulo/trunk/src/core/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Feb 17 16:12:25 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3/src/core:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215 /incubator/accumulo/branches/1.3.5rc/src/core:1209938 -/incubator/accumulo/branches/1.4/src/core:1201902-1245229 +/incubator/accumulo/branches/1.4/src/core:1201902-1245631 Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original) +++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java Fri Feb 17 16:12:25 2012 @@ -65,7 +65,6 @@ public enum Property { MASTER_RECOVERY_POOL("master.recovery.pool", "recovery", PropertyType.STRING, "Priority queue to use for log recovery map/reduce jobs."), MASTER_RECOVERY_SORT_MAPREDUCE("master.recovery.sort.mapreduce", "false", PropertyType.BOOLEAN, "If true, use map/reduce to sort write-ahead logs during recovery"), - MASTER_BULK_SERVERS("master.bulk.server.max", "4", PropertyType.COUNT, "The number of servers to use during a bulk load"), MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts to bulk-load a file before giving up."), MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."), MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java (original) +++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/Filter.java Fri Feb 17 16:12:25 2012 @@ -69,7 +69,7 @@ public abstract class Filter extends Wra * Iterates over the source until an acceptable key/value pair is found. */ protected void findTop() { - while (getSource().hasTop() && (negate == accept(getSource().getTopKey(), getSource().getTopValue()))) { + while (getSource().hasTop() && !getSource().getTopKey().isDeleted() && (negate == accept(getSource().getTopKey(), getSource().getTopValue()))) { try { getSource().next(); } catch (IOException e) { Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java (original) +++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/iterators/SortedKeyValueIterator.java Fri Feb 17 16:12:25 2012 @@ -59,7 +59,9 @@ public interface SortedKeyValueIterator< boolean hasTop(); /** - * Advances to the next K,V pair. + * Advances to the next K,V pair. Note that in minor compaction scope and in non-full major compaction scopes the iterator may see deletion entries. These + * entries should be preserved by all iterators except ones that are strictly scan-time iterators that will never be configured for the minc or majc scopes. + * Deletion entries are only removed during full major compactions. * * @throws IOException * if an I/O error occurs. @@ -88,7 +90,9 @@ public interface SortedKeyValueIterator< void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException; /** - * Returns top key. Can be called 0 or more times without affecting behavior of next() or hasTop(). + * Returns top key. Can be called 0 or more times without affecting behavior of next() or hasTop(). Note that in minor compaction scope and in non-full major + * compaction scopes the iterator may see deletion entries. These entries should be preserved by all iterators except ones that are strictly scan-time + * iterators that will never be configured for the minc or majc scopes. Deletion entries are only removed during full major compactions. * * @return K * @exception IllegalStateException Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/iterators/user/FilterTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/iterators/user/FilterTest.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/iterators/user/FilterTest.java (original) +++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/iterators/user/FilterTest.java Fri Feb 17 16:12:25 2012 @@ -444,4 +444,31 @@ public class FilterTest { a.seek(new Range(), EMPTY_COL_FAMS, false); assertEquals(size(a), 31); } + + @Test + public void testDeletes() throws IOException { + Text colf = new Text("a"); + Text colq = new Text("b"); + Value dv = new Value(); + TreeMap tm = new TreeMap(); + + Key k = new Key(new Text("0"), colf, colq); + tm.put(k, dv); + k = new Key(new Text("1"), colf, colq, 10); + k.setDeleted(true); + tm.put(k, dv); + k = new Key(new Text("1"), colf, colq, 5); + tm.put(k, dv); + k = new Key(new Text("10"), colf, colq); + tm.put(k, dv); + + assertTrue(tm.size() == 4); + + Filter filter = new SimpleFilter(); + filter.init(new SortedMapIterator(tm), EMPTY_OPTS, null); + filter.seek(new Range(), EMPTY_COL_FAMS, false); + int size = size(filter); + assertTrue("size = " + size, size == 3); + + } } Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java Fri Feb 17 16:12:25 2012 @@ -42,14 +42,13 @@ import org.apache.commons.lang.StringUti import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; public class WikipediaPartitionedMapper extends Mapper { - private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class); + // private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class); public final static Charset UTF8 = Charset.forName("UTF-8"); public static final String DOCUMENT_COLUMN_FAMILY = "d"; Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java Fri Feb 17 16:12:25 2012 @@ -23,40 +23,21 @@ package org.apache.accumulo.examples.wik import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.io.StringReader; import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.IllegalFormatException; -import java.util.Map.Entry; -import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; -import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.log4j.Logger; -import org.apache.lucene.analysis.tokenattributes.TermAttribute; -import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; public class WikipediaPartitioner extends Mapper { - private static final Logger log = Logger.getLogger(WikipediaPartitioner.class); + // private static final Logger log = Logger.getLogger(WikipediaPartitioner.class); public final static Charset UTF8 = Charset.forName("UTF-8"); public static final String DOCUMENT_COLUMN_FAMILY = "d"; Modified: incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java (original) +++ incubator/accumulo/trunk/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java Fri Feb 17 16:12:25 2012 @@ -4,20 +4,18 @@ import java.io.IOException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; public class SortingRFileOutputFormat extends OutputFormat { - private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class); + // private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class); public static final String PATH_NAME = "sortingrfileoutputformat.path"; public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size"; Propchange: incubator/accumulo/trunk/src/server/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Feb 17 16:12:25 2012 @@ -1,3 +1,3 @@ /incubator/accumulo/branches/1.3/src/server:1190280,1190413,1190420,1190427,1190500,1195622,1195625,1195629,1195635,1196044,1196054,1196057,1196071-1196072,1196106,1197066,1198935,1199383,1203683,1204625,1205547,1205880,1206169,1208031,1209124,1209526,1209532,1209539,1209541,1209587,1209657,1210518,1210571,1210596,1210598,1213424,1214320,1225006,1227215,1227231,1227611 /incubator/accumulo/branches/1.3.5rc/src/server:1209938 -/incubator/accumulo/branches/1.4/src/server:1201902-1245050 +/incubator/accumulo/branches/1.4/src/server:1201902-1245631 Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Fri Feb 17 16:12:25 2012 @@ -38,8 +38,8 @@ import org.apache.accumulo.core.client.A import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.ServerClient; import org.apache.accumulo.core.client.impl.TabletLocator; -import org.apache.accumulo.core.client.impl.Translator; import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation; +import org.apache.accumulo.core.client.impl.Translator; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -150,7 +150,7 @@ public class BulkImporter { } catch (Exception ex) { log.warn("Unable to find tablets that overlap file " + mapFile.toString()); } - + log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets"); if (tabletsToAssignMapFileTo.size() == 0) { List empty = Collections.emptyList(); completeFailures.put(mapFile, empty); @@ -652,33 +652,41 @@ public class BulkImporter { return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow()); } + final static byte[] byte0 = {0}; + public static List findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, Text startRow, Text endRow) throws Exception { List result = new ArrayList(); - Collection columnFamilies = Collections.emptyList(); - - FileSKVIterator reader = FileOperations.getInstance().openReader(file.toString(), true, fs, fs.getConf(), acuConf); + String filename = file.toString(); + // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow); + FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf); try { Text row = startRow; if (row == null) row = new Text(); while (true) { + // log.debug(filename + " Seeking to row " + row); reader.seek(new Range(row, null), columnFamilies, false); - if (!reader.hasTop()) + if (!reader.hasTop()) { + // log.debug(filename + " not found"); break; + } row = reader.getTopKey().getRow(); TabletLocation tabletLocation = locator.locateTablet(row, false, true); + // log.debug(filename + " found row " + row + " at location " + tabletLocation); result.add(tabletLocation); row = tabletLocation.tablet_extent.getEndRow(); - if (row != null && (endRow == null || row.compareTo(endRow) < 0)) - row = Range.followingPrefix(row); - else + if (row != null && (endRow == null || row.compareTo(endRow) < 0)) { + row = new Text(row); + row.append(byte0, 0, byte0.length); + } else break; } } finally { reader.close(); } + // log.debug(filename + " to be sent to " + result); return result; } Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Fri Feb 17 16:12:25 2012 @@ -19,11 +19,15 @@ package org.apache.accumulo.server.maste import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -41,12 +45,13 @@ import org.apache.accumulo.core.client.i import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Daemon; -import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -370,7 +375,7 @@ class LoadFiles extends MasterRepo { @Override public Repo call(final long tid, Master master) throws Exception { - + final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration())); List files = new ArrayList(); @@ -389,42 +394,68 @@ class LoadFiles extends MasterRepo { } fs.delete(writable, false); - // group files into N-sized chunks, send the chunks to random servers - final int SERVERS_TO_USE = Math.min(ServerConfiguration.getSystemConfiguration().getCount(Property.MASTER_BULK_SERVERS), master.onlineTabletServers() - .size()); - - log.debug("tid " + tid + " using " + SERVERS_TO_USE + " servers"); - // wait for success, repeat failures R times final List filesToLoad = Collections.synchronizedList(new ArrayList()); for (FileStatus f : files) filesToLoad.add(f.getPath().toString()); - final int RETRIES = Math.max(1, ServerConfiguration.getSystemConfiguration().getCount(Property.MASTER_BULK_RETRIES)); - for (int i = 0; i < RETRIES && filesToLoad.size() > 0; i++) { - List> results = new ArrayList>(); - for (List chunk : groupFiles(filesToLoad, SERVERS_TO_USE)) { - final List attempt = chunk; - results.add(threadPool.submit(new LoggingRunnable(log, new Runnable() { + + final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); + for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { + List>> results = new ArrayList>>(); + + // Figure out which files will be sent to which server + Set currentServers = Collections.synchronizedSet(new HashSet(master.onlineTabletServers())); + Map> loadAssignments = new HashMap>(); + for (TServerInstance server : currentServers) { + loadAssignments.put(server.hostPort(), new ArrayList()); + } + int i = 0; + List>> entries = new ArrayList>>(loadAssignments.entrySet()); + for (String file : filesToLoad) { + entries.get(i % entries.size()).getValue().add(file); + i++; + } + + // Use the threadpool to assign files one-at-a-time to the server + for (Entry> entry : entries) { + if (entry.getValue().isEmpty()) { + continue; + } + final Entry> finalEntry = entry; + results.add(threadPool.submit(new Callable>() { @Override - public void run() { + public List call() { + if (log.isDebugEnabled()) { + log.debug("Asking " + finalEntry.getKey() + " to load " + sampleList(finalEntry.getValue(), 10)); + } + List failures = new ArrayList(); ClientService.Iface client = null; try { - client = ServerClient.getConnection(HdfsZooInstance.getInstance()); - List fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(), tid, tableId, attempt, errorDir, setTime); - attempt.removeAll(fail); - filesToLoad.removeAll(attempt); + client = ThriftUtil.getTServerClient(finalEntry.getKey(), conf); + for (String file : finalEntry.getValue()) { + List attempt = Collections.singletonList(file); + log.debug("Asking " + finalEntry.getKey() + " to bulk import " + file); + List fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(), tid, tableId, attempt, errorDir, setTime); + if (fail.isEmpty()) { + filesToLoad.remove(file); + } else { + failures.addAll(fail); + } + } } catch (Exception ex) { log.error(ex, ex); } finally { ServerClient.close(client); } + return failures; } - }))); + })); } - for (Future f : results) - f.get(); + Set failures = new HashSet(); + for (Future> f : results) + failures.addAll(f.get()); if (filesToLoad.size() > 0) { - log.debug("tid " + tid + " attempt " + (i + 1) + " " + filesToLoad + " failed"); + log.debug("tid " + tid + " attempt " + (i + 1) + " " + sampleList(filesToLoad, 10) + " failed"); UtilWaitThread.sleep(100); } } @@ -449,16 +480,24 @@ class LoadFiles extends MasterRepo { return new CompleteBulkImport(tableId, source, bulk, errorDir); } - private List> groupFiles(List files, int groups) { - List> result = new ArrayList>(); - Iterator iter = files.iterator(); - for (int i = 0; i < groups && iter.hasNext(); i++) { - List group = new ArrayList(); - for (int j = 0; j < Math.ceil(files.size() / (double) groups) && iter.hasNext(); j++) { - group.add(iter.next()); + static String sampleList(Collection potentiallyLongList, int max) { + StringBuffer result = new StringBuffer(); + result.append("["); + int i = 0; + for (Object obj : potentiallyLongList) { + result.append(obj); + if (i >= max) { + result.append("..."); + break; + } else { + result.append(", "); } - result.add(group); + i++; } - return result; + if (i < max) + result.delete(result.length() - 2, result.length()); + result.append("]"); + return result.toString(); } + } Modified: incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java?rev=1245634&r1=1245633&r2=1245634&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java (original) +++ incubator/accumulo/trunk/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java Fri Feb 17 16:12:25 2012 @@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.server.client.BulkImporter; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,7 +51,7 @@ public class BulkImporterTest { static final Text tableId = new Text("1"); static { fakeMetaData.add(new KeyExtent(tableId, new Text("a"), null)); - for (String part : new String[] {"b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"}) { + for (String part : new String[] {"b", "bm", "c", "cm", "d", "dm", "e", "em", "f", "g", "h", "i", "j", "k", "l"}) { fakeMetaData.add(new KeyExtent(tableId, new Text(part), fakeMetaData.last().getEndRow())); } fakeMetaData.add(new KeyExtent(tableId, null, fakeMetaData.last().getEndRow())); @@ -121,6 +120,7 @@ public class BulkImporterTest { writer.append(new Key("d", "cf", "cq3"), empty); writer.append(new Key("d", "cf", "cq4"), empty); writer.append(new Key("d", "cf", "cq5"), empty); + writer.append(new Key("dd", "cf", "cq1"), empty); writer.append(new Key("ichabod", "cf", "cq"), empty); writer.append(new Key("icky", "cf", "cq1"), empty); writer.append(new Key("iffy", "cf", "cq2"), empty); @@ -130,18 +130,20 @@ public class BulkImporterTest { writer.append(new Key("xyzzy", "cf", "cq"), empty); writer.close(); List overlaps = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file)); - Assert.assertEquals(4, overlaps.size()); + Assert.assertEquals(5, overlaps.size()); Collections.sort(overlaps); - Assert.assertEquals(overlaps.get(0).tablet_extent, new KeyExtent(tableId, new Text("a"), null)); - Assert.assertEquals(overlaps.get(1).tablet_extent, new KeyExtent(tableId, new Text("d"), new Text("c"))); - Assert.assertEquals(overlaps.get(2).tablet_extent, new KeyExtent(tableId, new Text("j"), new Text("i"))); - Assert.assertEquals(overlaps.get(3).tablet_extent, new KeyExtent(tableId, null, new Text("l"))); + Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps.get(1).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps.get(2).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent); List overlaps2 = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text( "b"))); - Assert.assertEquals(2, overlaps2.size()); - Assert.assertEquals(overlaps2.get(0).tablet_extent, new KeyExtent(tableId, new Text("d"), new Text("c"))); - Assert.assertEquals(overlaps2.get(1).tablet_extent, new KeyExtent(tableId, new Text("j"), new Text("i"))); + Assert.assertEquals(3, overlaps2.size()); + Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps2.get(1).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps2.get(2).tablet_extent); Assert.assertEquals(locator.invalidated, 1); }