Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44CE99FA2 for ; Fri, 17 Feb 2012 16:04:15 +0000 (UTC) Received: (qmail 77974 invoked by uid 500); 17 Feb 2012 16:04:15 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 77960 invoked by uid 500); 17 Feb 2012 16:04:15 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 77953 invoked by uid 99); 17 Feb 2012 16:04:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 16:04:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2012 16:04:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 84EF62388900; Fri, 17 Feb 2012 16:03:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1245631 - in /incubator/accumulo/branches/1.4: ./ src/core/src/main/java/org/apache/accumulo/core/conf/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ src/examples/wikisearch/ingest/src/main/ja... Date: Fri, 17 Feb 2012 16:03:48 -0000 To: accumulo-commits@incubator.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120217160348.84EF62388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ecn Date: Fri Feb 17 16:03:47 2012 New Revision: 1245631 URL: http://svn.apache.org/viewvc?rev=1245631&view=rev Log: ACCUMULO-412 fix index search Modified: incubator/accumulo/branches/1.4/pom.xml incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java Modified: incubator/accumulo/branches/1.4/pom.xml URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/pom.xml?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/pom.xml (original) +++ incubator/accumulo/branches/1.4/pom.xml Fri Feb 17 16:03:47 2012 @@ -315,7 +315,7 @@ 0.8 false - ${project.build.directory}/${artifactId}_${project.version}-${os.arch}.deb + ${project.build.directory}/${project.artifactId}_${project.version}-${os.arch}.deb src/packages/deb/accumulo.control /opt/accumulo/accumulo-${project-version} Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original) +++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java Fri Feb 17 16:03:47 2012 @@ -65,7 +65,6 @@ public enum Property { MASTER_RECOVERY_POOL("master.recovery.pool", "recovery", PropertyType.STRING, "Priority queue to use for log recovery map/reduce jobs."), MASTER_RECOVERY_SORT_MAPREDUCE("master.recovery.sort.mapreduce", "false", PropertyType.BOOLEAN, "If true, use map/reduce to sort write-ahead logs during recovery"), - MASTER_BULK_SERVERS("master.bulk.server.max", "4", PropertyType.COUNT, "The number of servers to use during a bulk load"), MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts to bulk-load a file before giving up."), MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."), MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java Fri Feb 17 16:03:47 2012 @@ -42,14 +42,13 @@ import org.apache.commons.lang.StringUti import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.log4j.Logger; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; public class WikipediaPartitionedMapper extends Mapper { - private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class); + // private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class); public final static Charset UTF8 = Charset.forName("UTF-8"); public static final String DOCUMENT_COLUMN_FAMILY = "d"; Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java Fri Feb 17 16:03:47 2012 @@ -23,40 +23,21 @@ package org.apache.accumulo.examples.wik import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.io.StringReader; import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.IllegalFormatException; -import java.util.Map.Entry; -import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; -import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid; -import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.log4j.Logger; -import org.apache.lucene.analysis.tokenattributes.TermAttribute; -import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; public class WikipediaPartitioner extends Mapper { - private static final Logger log = Logger.getLogger(WikipediaPartitioner.class); + // private static final Logger log = Logger.getLogger(WikipediaPartitioner.class); public final static Charset UTF8 = Charset.forName("UTF-8"); public static final String DOCUMENT_COLUMN_FAMILY = "d"; Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java (original) +++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java Fri Feb 17 16:03:47 2012 @@ -4,20 +4,18 @@ import java.io.IOException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; public class SortingRFileOutputFormat extends OutputFormat { - private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class); + // private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class); public static final String PATH_NAME = "sortingrfileoutputformat.path"; public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size"; Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original) +++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Fri Feb 17 16:03:47 2012 @@ -38,8 +38,8 @@ import org.apache.accumulo.core.client.A import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.ServerClient; import org.apache.accumulo.core.client.impl.TabletLocator; -import org.apache.accumulo.core.client.impl.Translator; import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation; +import org.apache.accumulo.core.client.impl.Translator; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -150,7 +150,7 @@ public class BulkImporter { } catch (Exception ex) { log.warn("Unable to find tablets that overlap file " + mapFile.toString()); } - + log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size() + " tablets"); if (tabletsToAssignMapFileTo.size() == 0) { List empty = Collections.emptyList(); completeFailures.put(mapFile, empty); @@ -652,33 +652,41 @@ public class BulkImporter { return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow()); } + final static byte[] byte0 = {0}; + public static List findOverlappingTablets(AccumuloConfiguration acuConf, FileSystem fs, TabletLocator locator, Path file, Text startRow, Text endRow) throws Exception { List result = new ArrayList(); - Collection columnFamilies = Collections.emptyList(); - - FileSKVIterator reader = FileOperations.getInstance().openReader(file.toString(), true, fs, fs.getConf(), acuConf); + String filename = file.toString(); + // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow); + FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), acuConf); try { Text row = startRow; if (row == null) row = new Text(); while (true) { + // log.debug(filename + " Seeking to row " + row); reader.seek(new Range(row, null), columnFamilies, false); - if (!reader.hasTop()) + if (!reader.hasTop()) { + // log.debug(filename + " not found"); break; + } row = reader.getTopKey().getRow(); TabletLocation tabletLocation = locator.locateTablet(row, false, true); + // log.debug(filename + " found row " + row + " at location " + tabletLocation); result.add(tabletLocation); row = tabletLocation.tablet_extent.getEndRow(); - if (row != null && (endRow == null || row.compareTo(endRow) < 0)) - row = Range.followingPrefix(row); - else + if (row != null && (endRow == null || row.compareTo(endRow) < 0)) { + row = new Text(row); + row.append(byte0, 0, byte0.length); + } else break; } } finally { reader.close(); } + // log.debug(filename + " to be sent to " + result); return result; } Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original) +++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Fri Feb 17 16:03:47 2012 @@ -19,11 +19,15 @@ package org.apache.accumulo.server.maste import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -41,12 +45,13 @@ import org.apache.accumulo.core.client.i import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Daemon; -import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -370,7 +375,7 @@ class LoadFiles extends MasterRepo { @Override public Repo call(final long tid, Master master) throws Exception { - + final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration())); List files = new ArrayList(); @@ -389,42 +394,68 @@ class LoadFiles extends MasterRepo { } fs.delete(writable, false); - // group files into N-sized chunks, send the chunks to random servers - final int SERVERS_TO_USE = Math.min(ServerConfiguration.getSystemConfiguration().getCount(Property.MASTER_BULK_SERVERS), master.onlineTabletServers() - .size()); - - log.debug("tid " + tid + " using " + SERVERS_TO_USE + " servers"); - // wait for success, repeat failures R times final List filesToLoad = Collections.synchronizedList(new ArrayList()); for (FileStatus f : files) filesToLoad.add(f.getPath().toString()); - final int RETRIES = Math.max(1, ServerConfiguration.getSystemConfiguration().getCount(Property.MASTER_BULK_RETRIES)); - for (int i = 0; i < RETRIES && filesToLoad.size() > 0; i++) { - List> results = new ArrayList>(); - for (List chunk : groupFiles(filesToLoad, SERVERS_TO_USE)) { - final List attempt = chunk; - results.add(threadPool.submit(new LoggingRunnable(log, new Runnable() { + + final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); + for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { + List>> results = new ArrayList>>(); + + // Figure out which files will be sent to which server + Set currentServers = Collections.synchronizedSet(new HashSet(master.onlineTabletServers())); + Map> loadAssignments = new HashMap>(); + for (TServerInstance server : currentServers) { + loadAssignments.put(server.hostPort(), new ArrayList()); + } + int i = 0; + List>> entries = new ArrayList>>(loadAssignments.entrySet()); + for (String file : filesToLoad) { + entries.get(i % entries.size()).getValue().add(file); + i++; + } + + // Use the threadpool to assign files one-at-a-time to the server + for (Entry> entry : entries) { + if (entry.getValue().isEmpty()) { + continue; + } + final Entry> finalEntry = entry; + results.add(threadPool.submit(new Callable>() { @Override - public void run() { + public List call() { + if (log.isDebugEnabled()) { + log.debug("Asking " + finalEntry.getKey() + " to load " + sampleList(finalEntry.getValue(), 10)); + } + List failures = new ArrayList(); ClientService.Iface client = null; try { - client = ServerClient.getConnection(HdfsZooInstance.getInstance()); - List fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(), tid, tableId, attempt, errorDir, setTime); - attempt.removeAll(fail); - filesToLoad.removeAll(attempt); + client = ThriftUtil.getTServerClient(finalEntry.getKey(), conf); + for (String file : finalEntry.getValue()) { + List attempt = Collections.singletonList(file); + log.debug("Asking " + finalEntry.getKey() + " to bulk import " + file); + List fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(), tid, tableId, attempt, errorDir, setTime); + if (fail.isEmpty()) { + filesToLoad.remove(file); + } else { + failures.addAll(fail); + } + } } catch (Exception ex) { log.error(ex, ex); } finally { ServerClient.close(client); } + return failures; } - }))); + })); } - for (Future f : results) - f.get(); + Set failures = new HashSet(); + for (Future> f : results) + failures.addAll(f.get()); if (filesToLoad.size() > 0) { - log.debug("tid " + tid + " attempt " + (i + 1) + " " + filesToLoad + " failed"); + log.debug("tid " + tid + " attempt " + (i + 1) + " " + sampleList(filesToLoad, 10) + " failed"); UtilWaitThread.sleep(100); } } @@ -449,16 +480,24 @@ class LoadFiles extends MasterRepo { return new CompleteBulkImport(tableId, source, bulk, errorDir); } - private List> groupFiles(List files, int groups) { - List> result = new ArrayList>(); - Iterator iter = files.iterator(); - for (int i = 0; i < groups && iter.hasNext(); i++) { - List group = new ArrayList(); - for (int j = 0; j < Math.ceil(files.size() / (double) groups) && iter.hasNext(); j++) { - group.add(iter.next()); + static String sampleList(Collection potentiallyLongList, int max) { + StringBuffer result = new StringBuffer(); + result.append("["); + int i = 0; + for (Object obj : potentiallyLongList) { + result.append(obj); + if (i >= max) { + result.append("..."); + break; + } else { + result.append(", "); } - result.add(group); + i++; } - return result; + if (i < max) + result.delete(result.length() - 2, result.length()); + result.append("]"); + return result.toString(); } + } Modified: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java?rev=1245631&r1=1245630&r2=1245631&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java (original) +++ incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java Fri Feb 17 16:03:47 2012 @@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.server.client.BulkImporter; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,7 +51,7 @@ public class BulkImporterTest { static final Text tableId = new Text("1"); static { fakeMetaData.add(new KeyExtent(tableId, new Text("a"), null)); - for (String part : new String[] {"b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"}) { + for (String part : new String[] {"b", "bm", "c", "cm", "d", "dm", "e", "em", "f", "g", "h", "i", "j", "k", "l"}) { fakeMetaData.add(new KeyExtent(tableId, new Text(part), fakeMetaData.last().getEndRow())); } fakeMetaData.add(new KeyExtent(tableId, null, fakeMetaData.last().getEndRow())); @@ -121,6 +120,7 @@ public class BulkImporterTest { writer.append(new Key("d", "cf", "cq3"), empty); writer.append(new Key("d", "cf", "cq4"), empty); writer.append(new Key("d", "cf", "cq5"), empty); + writer.append(new Key("dd", "cf", "cq1"), empty); writer.append(new Key("ichabod", "cf", "cq"), empty); writer.append(new Key("icky", "cf", "cq1"), empty); writer.append(new Key("iffy", "cf", "cq2"), empty); @@ -130,18 +130,20 @@ public class BulkImporterTest { writer.append(new Key("xyzzy", "cf", "cq"), empty); writer.close(); List overlaps = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file)); - Assert.assertEquals(4, overlaps.size()); + Assert.assertEquals(5, overlaps.size()); Collections.sort(overlaps); - Assert.assertEquals(overlaps.get(0).tablet_extent, new KeyExtent(tableId, new Text("a"), null)); - Assert.assertEquals(overlaps.get(1).tablet_extent, new KeyExtent(tableId, new Text("d"), new Text("c"))); - Assert.assertEquals(overlaps.get(2).tablet_extent, new KeyExtent(tableId, new Text("j"), new Text("i"))); - Assert.assertEquals(overlaps.get(3).tablet_extent, new KeyExtent(tableId, null, new Text("l"))); + Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps.get(1).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps.get(2).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent); List overlaps2 = BulkImporter.findOverlappingTablets(acuConf, fs, locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text( "b"))); - Assert.assertEquals(2, overlaps2.size()); - Assert.assertEquals(overlaps2.get(0).tablet_extent, new KeyExtent(tableId, new Text("d"), new Text("c"))); - Assert.assertEquals(overlaps2.get(1).tablet_extent, new KeyExtent(tableId, new Text("j"), new Text("i"))); + Assert.assertEquals(3, overlaps2.size()); + Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps2.get(1).tablet_extent); + Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps2.get(2).tablet_extent); Assert.assertEquals(locator.invalidated, 1); }