incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1245631 - in /incubator/accumulo/branches/1.4: ./ src/core/src/main/java/org/apache/accumulo/core/conf/ src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/ src/examples/wikisearch/ingest/src/main/ja...
Date Fri, 17 Feb 2012 16:03:48 GMT
Author: ecn
Date: Fri Feb 17 16:03:47 2012
New Revision: 1245631

URL: http://svn.apache.org/viewvc?rev=1245631&view=rev
Log:
ACCUMULO-412 fix index search

Modified:
    incubator/accumulo/branches/1.4/pom.xml
    incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
    incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java

Modified: incubator/accumulo/branches/1.4/pom.xml
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/pom.xml?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/pom.xml (original)
+++ incubator/accumulo/branches/1.4/pom.xml Fri Feb 17 16:03:47 2012
@@ -315,7 +315,7 @@
         <version>0.8</version>
         <inherited>false</inherited>
         <configuration>
-          <deb>${project.build.directory}/${artifactId}_${project.version}-${os.arch}.deb</deb>
+          <deb>${project.build.directory}/${project.artifactId}_${project.version}-${os.arch}.deb</deb>
           <controlDir>src/packages/deb/accumulo.control</controlDir>
           <installDir>/opt/accumulo/accumulo-${project-version}</installDir>
           <dataSet>

Modified: incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
(original)
+++ incubator/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
Fri Feb 17 16:03:47 2012
@@ -65,7 +65,6 @@ public enum Property {
   MASTER_RECOVERY_POOL("master.recovery.pool", "recovery", PropertyType.STRING, "Priority
queue to use for log recovery map/reduce jobs."),
   MASTER_RECOVERY_SORT_MAPREDUCE("master.recovery.sort.mapreduce", "false", PropertyType.BOOLEAN,
       "If true, use map/reduce to sort write-ahead logs during recovery"),
-  MASTER_BULK_SERVERS("master.bulk.server.max", "4", PropertyType.COUNT, "The number of servers
to use during a bulk load"),
   MASTER_BULK_RETRIES("master.bulk.retries", "3", PropertyType.COUNT, "The number of attempts
to bulk-load a file before giving up."),
   MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The
number of threads to use when coordinating a bulk-import."),
   MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum
number of threads to use to handle incoming requests."),

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedMapper.java
Fri Feb 17 16:03:47 2012
@@ -42,14 +42,13 @@ import org.apache.commons.lang.StringUti
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
 public class WikipediaPartitionedMapper extends Mapper<Text,Article,Text,Mutation>
{
   
-  private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class);
+  // private static final Logger log = Logger.getLogger(WikipediaPartitionedMapper.class);
   
   public final static Charset UTF8 = Charset.forName("UTF-8");
   public static final String DOCUMENT_COLUMN_FAMILY = "d";

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitioner.java
Fri Feb 17 16:03:47 2012
@@ -23,40 +23,21 @@ package org.apache.accumulo.examples.wik
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.StringReader;
 import java.nio.charset.Charset;
-import java.util.HashSet;
-import java.util.IllegalFormatException;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
 import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit;
-import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer;
-import org.apache.accumulo.examples.wikisearch.protobuf.Uid;
-import org.apache.accumulo.examples.wikisearch.protobuf.Uid.List.Builder;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.log4j.Logger;
-import org.apache.lucene.analysis.tokenattributes.TermAttribute;
-import org.apache.lucene.wikipedia.analysis.WikipediaTokenizer;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 
 public class WikipediaPartitioner extends Mapper<LongWritable,Text,Text,Article> {
   
-  private static final Logger log = Logger.getLogger(WikipediaPartitioner.class);
+  // private static final Logger log = Logger.getLogger(WikipediaPartitioner.class);
   
   public final static Charset UTF8 = Charset.forName("UTF-8");
   public static final String DOCUMENT_COLUMN_FAMILY = "d";

Modified: incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
(original)
+++ incubator/accumulo/branches/1.4/src/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/output/SortingRFileOutputFormat.java
Fri Feb 17 16:03:47 2012
@@ -4,20 +4,18 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
 
 public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {
 
-  private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);
+  // private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);
 
   public static final String PATH_NAME = "sortingrfileoutputformat.path";
   public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
Fri Feb 17 16:03:47 2012
@@ -38,8 +38,8 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.ServerClient;
 import org.apache.accumulo.core.client.impl.TabletLocator;
-import org.apache.accumulo.core.client.impl.Translator;
 import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.client.impl.Translator;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -150,7 +150,7 @@ public class BulkImporter {
             } catch (Exception ex) {
               log.warn("Unable to find tablets that overlap file " + mapFile.toString());
             }
-            
+            log.debug("Map file " + mapFile + " found to overlap " + tabletsToAssignMapFileTo.size()
+ " tablets");
             if (tabletsToAssignMapFileTo.size() == 0) {
               List<KeyExtent> empty = Collections.emptyList();
               completeFailures.put(mapFile, empty);
@@ -652,33 +652,41 @@ public class BulkImporter {
     return findOverlappingTablets(acuConf, fs, locator, file, start, failed.getEndRow());
   }
   
+  final static byte[] byte0 = {0};
+
   public static List<TabletLocation> findOverlappingTablets(AccumuloConfiguration acuConf,
FileSystem fs, TabletLocator locator, Path file, Text startRow,
       Text endRow) throws Exception {
     List<TabletLocation> result = new ArrayList<TabletLocation>();
-    
     Collection<ByteSequence> columnFamilies = Collections.emptyList();
-    
-    FileSKVIterator reader = FileOperations.getInstance().openReader(file.toString(), true,
fs, fs.getConf(), acuConf);
+    String filename = file.toString();
+    // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
+    FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs,
fs.getConf(), acuConf);
     try {
       Text row = startRow;
       if (row == null)
         row = new Text();
       while (true) {
+        // log.debug(filename + " Seeking to row " + row);
         reader.seek(new Range(row, null), columnFamilies, false);
-        if (!reader.hasTop())
+        if (!reader.hasTop()) {
+          // log.debug(filename + " not found");
           break;
+        }
         row = reader.getTopKey().getRow();
         TabletLocation tabletLocation = locator.locateTablet(row, false, true);
+        // log.debug(filename + " found row " + row + " at location " + tabletLocation);
         result.add(tabletLocation);
         row = tabletLocation.tablet_extent.getEndRow();
-        if (row != null && (endRow == null || row.compareTo(endRow) < 0))
-          row = Range.followingPrefix(row);
-        else
+        if (row != null && (endRow == null || row.compareTo(endRow) < 0)) {
+          row = new Text(row);
+          row.append(byte0, 0, byte0.length);
+        } else
           break;
       }
     } finally {
       reader.close();
     }
+    // log.debug(filename + " to be sent to " + result);
     return result;
   }
   

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Fri Feb 17 16:03:47 2012
@@ -19,11 +19,15 @@ package org.apache.accumulo.server.maste
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,12 +45,13 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -370,7 +375,7 @@ class LoadFiles extends MasterRepo {
   
   @Override
   public Repo<Master> call(final long tid, Master master) throws Exception {
-    
+    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
     FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
         ServerConfiguration.getSiteConfiguration()));
     List<FileStatus> files = new ArrayList<FileStatus>();
@@ -389,42 +394,68 @@ class LoadFiles extends MasterRepo {
     }
     fs.delete(writable, false);
     
-    // group files into N-sized chunks, send the chunks to random servers
-    final int SERVERS_TO_USE = Math.min(ServerConfiguration.getSystemConfiguration().getCount(Property.MASTER_BULK_SERVERS),
master.onlineTabletServers()
-        .size());
-    
-    log.debug("tid " + tid + " using " + SERVERS_TO_USE + " servers");
-    // wait for success, repeat failures R times
     final List<String> filesToLoad = Collections.synchronizedList(new ArrayList<String>());
     for (FileStatus f : files)
       filesToLoad.add(f.getPath().toString());
     
-    final int RETRIES = Math.max(1, ServerConfiguration.getSystemConfiguration().getCount(Property.MASTER_BULK_RETRIES));
-    for (int i = 0; i < RETRIES && filesToLoad.size() > 0; i++) {
-      List<Future<?>> results = new ArrayList<Future<?>>();
-      for (List<String> chunk : groupFiles(filesToLoad, SERVERS_TO_USE)) {
-        final List<String> attempt = chunk;
-        results.add(threadPool.submit(new LoggingRunnable(log, new Runnable() {
+
+    final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES));
+    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++)
{
+      List<Future<List<String>>> results = new ArrayList<Future<List<String>>>();
+      
+      // Figure out which files will be sent to which server
+      Set<TServerInstance> currentServers = Collections.synchronizedSet(new HashSet<TServerInstance>(master.onlineTabletServers()));
+      Map<String,List<String>> loadAssignments = new HashMap<String,List<String>>();
+      for (TServerInstance server : currentServers) {
+        loadAssignments.put(server.hostPort(), new ArrayList<String>());
+      }
+      int i = 0;
+      List<Entry<String,List<String>>> entries = new ArrayList<Entry<String,List<String>>>(loadAssignments.entrySet());
+      for (String file : filesToLoad) {
+        entries.get(i % entries.size()).getValue().add(file);
+        i++;
+      }
+      
+      // Use the threadpool to assign files one-at-a-time to the server
+      for (Entry<String,List<String>> entry : entries) {
+        if (entry.getValue().isEmpty()) {
+          continue;
+        }
+        final Entry<String,List<String>> finalEntry = entry;
+        results.add(threadPool.submit(new Callable<List<String>>() {
           @Override
-          public void run() {
+          public List<String> call() {
+            if (log.isDebugEnabled()) {
+              log.debug("Asking " + finalEntry.getKey() + " to load " + sampleList(finalEntry.getValue(),
10));
+            }
+            List<String> failures = new ArrayList<String>();
             ClientService.Iface client = null;
             try {
-              client = ServerClient.getConnection(HdfsZooInstance.getInstance());
-              List<String> fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(),
tid, tableId, attempt, errorDir, setTime);
-              attempt.removeAll(fail);
-              filesToLoad.removeAll(attempt);
+              client = ThriftUtil.getTServerClient(finalEntry.getKey(), conf);
+              for (String file : finalEntry.getValue()) {
+                List<String> attempt = Collections.singletonList(file);
+                log.debug("Asking " + finalEntry.getKey() + " to bulk import " + file);
+                List<String> fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(),
tid, tableId, attempt, errorDir, setTime);
+                if (fail.isEmpty()) {
+                  filesToLoad.remove(file);
+                } else {
+                  failures.addAll(fail);
+                }
+              }
             } catch (Exception ex) {
               log.error(ex, ex);
             } finally {
               ServerClient.close(client);
             }
+            return failures;
           }
-        })));
+        }));
       }
-      for (Future<?> f : results)
-        f.get();
+      Set<String> failures = new HashSet<String>();
+      for (Future<List<String>> f : results)
+        failures.addAll(f.get());
       if (filesToLoad.size() > 0) {
-        log.debug("tid " + tid + " attempt " + (i + 1) + " " + filesToLoad + " failed");
+        log.debug("tid " + tid + " attempt " + (i + 1) + " " + sampleList(filesToLoad, 10)
+ " failed");
         UtilWaitThread.sleep(100);
       }
     }
@@ -449,16 +480,24 @@ class LoadFiles extends MasterRepo {
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }
   
-  private List<List<String>> groupFiles(List<String> files, int groups)
{
-    List<List<String>> result = new ArrayList<List<String>>();
-    Iterator<String> iter = files.iterator();
-    for (int i = 0; i < groups && iter.hasNext(); i++) {
-      List<String> group = new ArrayList<String>();
-      for (int j = 0; j < Math.ceil(files.size() / (double) groups) && iter.hasNext();
j++) {
-        group.add(iter.next());
+  static String sampleList(Collection<?> potentiallyLongList, int max) {
+    StringBuffer result = new StringBuffer();
+    result.append("[");
+    int i = 0;
+    for (Object obj : potentiallyLongList) {
+      result.append(obj);
+      if (i >= max) {
+        result.append("...");
+        break;
+      } else {
+        result.append(", ");
       }
-      result.add(group);
+      i++;
     }
-    return result;
+    if (i < max)
+      result.delete(result.length() - 2, result.length());
+    result.append("]");
+    return result.toString();
   }
+
 }

Modified: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java?rev=1245631&r1=1245630&r2=1245631&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
Fri Feb 17 16:03:47 2012
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.client.BulkImporter;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,7 +51,7 @@ public class BulkImporterTest {
   static final Text tableId = new Text("1");
   static {
     fakeMetaData.add(new KeyExtent(tableId, new Text("a"), null));
-    for (String part : new String[] {"b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"})
{
+    for (String part : new String[] {"b", "bm", "c", "cm", "d", "dm", "e", "em", "f", "g",
"h", "i", "j", "k", "l"}) {
       fakeMetaData.add(new KeyExtent(tableId, new Text(part), fakeMetaData.last().getEndRow()));
     }
     fakeMetaData.add(new KeyExtent(tableId, null, fakeMetaData.last().getEndRow()));
@@ -121,6 +120,7 @@ public class BulkImporterTest {
     writer.append(new Key("d", "cf", "cq3"), empty);
     writer.append(new Key("d", "cf", "cq4"), empty);
     writer.append(new Key("d", "cf", "cq5"), empty);
+    writer.append(new Key("dd", "cf", "cq1"), empty);
     writer.append(new Key("ichabod", "cf", "cq"), empty);
     writer.append(new Key("icky", "cf", "cq1"), empty);
     writer.append(new Key("iffy", "cf", "cq2"), empty);
@@ -130,18 +130,20 @@ public class BulkImporterTest {
     writer.append(new Key("xyzzy", "cf", "cq"), empty);
     writer.close();
     List<TabletLocation> overlaps = BulkImporter.findOverlappingTablets(acuConf, fs,
locator, new Path(file));
-    Assert.assertEquals(4, overlaps.size());
+    Assert.assertEquals(5, overlaps.size());
     Collections.sort(overlaps);
-    Assert.assertEquals(overlaps.get(0).tablet_extent, new KeyExtent(tableId, new Text("a"),
null));
-    Assert.assertEquals(overlaps.get(1).tablet_extent, new KeyExtent(tableId, new Text("d"),
new Text("c")));
-    Assert.assertEquals(overlaps.get(2).tablet_extent, new KeyExtent(tableId, new Text("j"),
new Text("i")));
-    Assert.assertEquals(overlaps.get(3).tablet_extent, new KeyExtent(tableId, null, new Text("l")));
+    Assert.assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps.get(1).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps.get(2).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps.get(3).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent);
     
     List<TabletLocation> overlaps2 = BulkImporter.findOverlappingTablets(acuConf, fs,
locator, new Path(file), new KeyExtent(tableId, new Text("h"), new Text(
         "b")));
-    Assert.assertEquals(2, overlaps2.size());
-    Assert.assertEquals(overlaps2.get(0).tablet_extent, new KeyExtent(tableId, new Text("d"),
new Text("c")));
-    Assert.assertEquals(overlaps2.get(1).tablet_extent, new KeyExtent(tableId, new Text("j"),
new Text("i")));
+    Assert.assertEquals(3, overlaps2.size());
+    Assert.assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("dm"), new Text("d")), overlaps2.get(1).tablet_extent);
+    Assert.assertEquals(new KeyExtent(tableId, new Text("j"), new Text("i")), overlaps2.get(2).tablet_extent);
     Assert.assertEquals(locator.invalidated, 1);
   }
   



Mime
View raw message