hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1378331 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org...
Date Tue, 28 Aug 2012 20:54:20 GMT
Author: mbautin
Date: Tue Aug 28 20:54:19 2012
New Revision: 1378331

URL: http://svn.apache.org/viewvc?rev=1378331&view=rev
Log:
[HBASE-6590] Assign SequenceNumber to BulkLoaded files.

Author: aaiyer

Summary:
StoreFiles within a store are sorted based on the sequenceId. SequenceId is a monotonically
increasing number that accompanies every edit written to the WAL. For entries that update
the same cell, we would like the latter edit to win. This comparision is accomplished using
memstoreTS, at the KV level; and sequenceId at the StoreFile level (to order scanners in the
KeyValueHeap).

BulkLoaded files are generated outside of HBase/RegionServer, so they do not have a sequenceId
written in the file.  This causes HBase to lose track of the point in time, when the BulkLoaded
file was imported to HBase. Resulting in a behavior, that **only** supports viewing bulkLoaded
files as files back-filling data from the begining of time.

By assigning a sequence number to the file, we can allow the bulk loaded file to fit in where
we want. Either at the "current time" or the "begining of time". The latter is the default,
to maintain backward compatibility.

Design approach:
Store files keep track of the sequence Id in the trailer. Since we do not wish to edit/rewrite
the bulk loaded file upon import, we will encode the assigned sequenceId into the fileName.
The filename RegEx is updated for this regard. If the sequenceId is encoded in the filename,
the sequenceId will be used as the sequenceId for the file. If none is found, the sequenceId
will be considered 0 (as per the default, backward-compatible behavior).

To enable clients to request pre-existing behavior, the command line utility allows for 2
ways to import BulkLoaded Files: to assign or not assign a sequence Number.
- If a sequence Number is assigned, the imporeted file will be imported with the "current
sequence Id".
- if the sequence Number is not assigned, it will be as if it was backfilling old data, from
the begining of time.

Compaction behavior:
- With the current compaction algorithm, bulk loaded files -- that backfill data, to the begining
of time -- can cause a compaction storm, converting every minor compaction to a major compaction.
To address this, these files are excluded from minor compaction, based on a config param.
(enabled for the messages use case).
- Since, bulk loaded files that are not back-filling data do not cause this issue, they will
not be ignored during minor compactions based on the config parameter. This is also required
to ensure that there are no holes in the set of files selected for compaction -- this is necessary
to preserve the order of KV's comparision before and after comparision.

Test Plan:
mr tests

extend test bulk import and test both behaviors.

Reviewers: kannan, kranganathan, gqchen, nspiegelberg, mbautin, liyintang

Reviewed By: kannan

CC: HBase Diffs Facebook Group

Differential Revision: https://reviews.facebook.net/D3789

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Tue Aug 28 20:54:19 2012
@@ -360,6 +360,8 @@ public interface HRegionInterface extend
    */
   public void bulkLoadHFile(String hfilePath,
       byte[] regionName, byte[] familyName) throws IOException;
+  public void bulkLoadHFile(String hfilePath,
+      byte[] regionName, byte[] familyName, boolean assignSeqNum) throws IOException;
 
   /**
    * Replicates the given entries. The guarantee is that the given entries
@@ -387,4 +389,6 @@ public interface HRegionInterface extend
    */
   public int updateFavoredNodes(AssignmentPlan plan)
   throws IOException;
+
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Tue Aug 28 20:54:19 2012
@@ -66,9 +66,13 @@ public class LoadIncrementalHFiles exten
   static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
 
   public static String NAME = "completebulkload";
+  public static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+
+  private boolean assignSeqIds;
 
   public LoadIncrementalHFiles(Configuration conf) {
     super(conf);
+    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
   }
 
   public LoadIncrementalHFiles() {
@@ -239,7 +243,7 @@ public class LoadIncrementalHFiles exten
           }
 
           byte[] regionName = location.getRegionInfo().getRegionName();
-          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
+          server.bulkLoadHFile(hfilePath.toString(), regionName, item.family, assignSeqIds);
           return null;
         }
       });

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Tue Aug 28 20:54:19 2012
@@ -612,12 +612,15 @@ public class HRegion implements HeapSize
             Store store = future.get();
 
             this.stores.put(store.getColumnFamilyName().getBytes(), store);
-            long storeSeqId = store.getMaxSequenceId();
-            if (minSeqId == -1 || storeSeqId < minSeqId) {
-              minSeqId = storeSeqId;
+            // Do not include bulk loaded files when determining seqIdForReplay
+            long storeSeqIdForReplay = store.getMaxSequenceId(false);
+            if (minSeqId == -1 || storeSeqIdForReplay < minSeqId) {
+              minSeqId = storeSeqIdForReplay;
             }
-            if (maxSeqId == -1 || storeSeqId > maxSeqId) {
-              maxSeqId = storeSeqId;
+            // Include bulk loaded files when determining seqIdForAssignment
+            long storeSeqIdForAssignment = store.getMaxSequenceId(true);
+            if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
+              maxSeqId = storeSeqIdForAssignment;
             }
             long maxStoreMemstoreTS = store.getMaxMemstoreTS();
             if (maxStoreMemstoreTS > maxMemstoreTS) {
@@ -2929,8 +2932,10 @@ public class HRegion implements HeapSize
     }
   }
 
-  public void bulkLoadHFile(String hfilePath, byte[] familyName)
+  public void bulkLoadHFile(String hfilePath, byte[] familyName, boolean assignSeqId)
   throws IOException {
+    long seqId = this.log.obtainSeqNum();
+    
     splitsAndClosesLock.readLock().lock();
     try {
       Store store = getStore(familyName);
@@ -2938,11 +2943,15 @@ public class HRegion implements HeapSize
         throw new DoNotRetryIOException(
             "No such column family " + Bytes.toStringBinary(familyName));
       }
-      store.bulkLoadHFile(hfilePath);
+      store.bulkLoadHFile(hfilePath, assignSeqId ? seqId : -1);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
-
+  }
+  
+  public void bulkLoadHFile(String hfilePath, byte[] familyName)
+  throws IOException {
+    bulkLoadHFile(hfilePath, familyName, false);
   }
 
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue Aug 28 20:54:19 2012
@@ -2697,8 +2697,15 @@ public class HRegionServer implements HR
   public void bulkLoadHFile(
       String hfilePath, byte[] regionName, byte[] familyName)
   throws IOException {
+    bulkLoadHFile(hfilePath, regionName, familyName, false);
+  }
+  
+  @Override
+  public void bulkLoadHFile(
+      String hfilePath, byte[] regionName, byte[] familyName, 
+      boolean assignSeqNum) throws IOException {
     HRegion region = getRegion(regionName);
-    region.bulkLoadHFile(hfilePath, familyName);
+    region.bulkLoadHFile(hfilePath, familyName, assignSeqNum);
   }
 
   Map<String, Integer> rowlocks =

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue
Aug 28 20:54:19 2012
@@ -266,8 +266,8 @@ public class Store extends SchemaConfigu
   /**
    * @return The maximum sequence id in all store files.
    */
-  long getMaxSequenceId() {
-    return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
+  long getMaxSequenceId(boolean includeBulkLoadedFiles) {
+    return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkLoadedFiles);
   }
 
   /**
@@ -417,7 +417,7 @@ public class Store extends SchemaConfigu
     return this.storefiles;
   }
 
-  public void bulkLoadHFile(String srcPathStr) throws IOException {
+  public void bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException {
     Path srcPath = new Path(srcPathStr);
 
     HFile.Reader reader  = null;
@@ -461,7 +461,8 @@ public class Store extends SchemaConfigu
       srcPath = tmpPath;
     }
 
-    Path dstPath = StoreFile.getRandomFilename(fs, homedir);
+    Path dstPath = StoreFile.getRandomFilename(fs, homedir, 
+        (sequenceId >= 0) ? ("_SeqId_" + sequenceId + "_") : null);
     LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
     StoreFile.rename(fs, srcPath, dstPath);
 
@@ -862,7 +863,7 @@ public class Store extends SchemaConfigu
     }
 
     // Max-sequenceID is the last key in the files we're compacting
-    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
 
     // Ready to go. Have list of files to compact.
     MonitoredTask status = TaskMonitor.get().createStatus(
@@ -930,10 +931,10 @@ public class Store extends SchemaConfigu
         }
 
         filesToCompact = filesToCompact.subList(count - N, count);
-        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
         isMajor = (filesToCompact.size() == storefiles.size());
         filesCompacting.addAll(filesToCompact);
-        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+        Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
       }
     } finally {
       this.lock.readLock().unlock();
@@ -1125,7 +1126,7 @@ public class Store extends SchemaConfigu
               filesToCompact, filesCompacting);
         }
         filesCompacting.addAll(filesToCompact.getFilesToCompact());
-        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+        Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
 
         // major compaction iff all StoreFiles are included
         boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
@@ -1239,7 +1240,8 @@ public class Store extends SchemaConfigu
             new Predicate<StoreFile>() {
               @Override
               public boolean apply(StoreFile input) {
-                return input.isBulkLoadResult();
+                // If we have assigned a sequenceId to the hfile, we won't skip the file.
+                return input.isBulkLoadResult() && input.getMaxSequenceId() <=
0;
               }
             }));
         LOG.debug("Exclude " +
@@ -1546,7 +1548,7 @@ public class Store extends SchemaConfigu
   }
 
   public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
-    Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
+    Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
     ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
     return newList;
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Tue Aug 28 20:54:19 2012
@@ -187,7 +187,7 @@ public class StoreFile extends SchemaCon
    * this file's id.  Group 2 the referenced region name, etc.
    */
   private static final Pattern REF_NAME_PARSER =
-    Pattern.compile("^([0-9a-f]+)(?:\\.(.+))?$");
+    Pattern.compile("^([0-9a-f]+(?:_SeqId_[0-9]+_)?)(?:\\.(.+))?$");
 
   // StoreFile.Reader
   private volatile Reader reader;
@@ -374,13 +374,16 @@ public class StoreFile extends SchemaCon
    * the given list. Store files that were created by a mapreduce
    * bulk load are ignored, as they do not correspond to any edit
    * log items.
+   * @param sfs 
+   * @param includeBulkLoadedFiles
    * @return 0 if no non-bulk-load files are provided or, this is Store that
    * does not yet have any store files.
    */
-  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
+  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
+      boolean includeBulkLoadedFiles) {
     long max = 0;
     for (StoreFile sf : sfs) {
-      if (!sf.isBulkLoadResult()) {
+      if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
         max = Math.max(max, sf.getMaxSequenceId());
       }
     }
@@ -441,7 +444,25 @@ public class StoreFile extends SchemaCon
           this.sequenceid += 1;
         }
       }
+    } 
+    
+    if (isBulkLoadResult()){
+      // generate the sequenceId from the fileName
+      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
+      String fileName = this.path.getName();
+      int startPos = fileName.indexOf("SeqId_");
+      if (startPos != -1) {
+        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
+            fileName.indexOf('_', startPos + 6)));
+        // Handle reference files as done above.
+        if (isReference()) {
+          if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
+            this.sequenceid += 1;
+          }
+        }
+      }
     }
+    
     this.reader.setSequenceID(this.sequenceid);
 
     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@@ -1583,32 +1604,35 @@ public class StoreFile extends SchemaCon
    */
   abstract static class Comparators {
     /**
-     * Comparator that compares based on the flush time of
-     * the StoreFiles. All bulk loads are placed before all non-
-     * bulk loads, and then all files are sorted by sequence ID.
+     * Comparator that compares based on the Sequence Id of the
+     * the StoreFiles. Bulk loads that did not request a seq ID
+     * are given a seq id of -1; thus, they are placed before all non-
+     * bulk loads, and bulk loads with sequence Id. Among these files,
+     * the bulkLoadTime is used to determine the ordering.
      * If there are ties, the path name is used as a tie-breaker.
      */
-    static final Comparator<StoreFile> FLUSH_TIME =
+    static final Comparator<StoreFile> SEQ_ID =
       Ordering.compound(ImmutableList.of(
-          Ordering.natural().onResultOf(new GetBulkTime()),
           Ordering.natural().onResultOf(new GetSeqId()),
+          Ordering.natural().onResultOf(new GetBulkTime()),
           Ordering.natural().onResultOf(new GetPathName())
       ));
 
-    private static class GetBulkTime implements Function<StoreFile, Long> {
+    private static class GetSeqId implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
-        if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
-        return sf.getBulkLoadTimestamp();
+        return sf.getMaxSequenceId();
       }
     }
-    private static class GetSeqId implements Function<StoreFile, Long> {
+    
+    private static class GetBulkTime implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
-        if (sf.isBulkLoadResult()) return -1L;
-        return sf.getMaxSequenceId();
+        if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
+        return sf.getBulkLoadTimestamp();
       }
     }
+    
     private static class GetPathName implements Function<StoreFile, String> {
       @Override
       public String apply(StoreFile sf) {
@@ -1628,4 +1652,4 @@ public class StoreFile extends SchemaCon
       });
   }
 
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Tue Aug 28 20:54:19 2012
@@ -1251,7 +1251,7 @@ public class HLog implements Syncable {
   /**
    * Obtain a log sequence number.
    */
-  private long obtainSeqNum() {
+  public long obtainSeqNum() {
     return this.logSeqNum.incrementAndGet();
   }
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Tue Aug 28 20:54:19 2012
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,10 +34,14 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.TestStore;
+import org.apache.hadoop.hbase.regionserver.TestStoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 
@@ -88,6 +93,30 @@ public class TestLoadIncrementalHFiles {
     });
   }
 
+  /**
+   * Test case that creates some regions and loads
+   * HFiles that fit snugly inside those regions
+   */
+  @Test
+  public void testBulkLoadSequenceNumber() throws Exception {
+    util.getConfiguration().setBoolean(LoadIncrementalHFiles.ASSIGN_SEQ_IDS, true);
+    verifyAssignedSequenceNumber("testBulkLoadSequenceNumber-WithSeqNum",
+        new byte[][][] {
+          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+    }, true);
+  }
+    
+  @Test
+  public void testBulkLoadSequenceNumberOld() throws Exception {
+    util.getConfiguration().setBoolean(LoadIncrementalHFiles.ASSIGN_SEQ_IDS, false);
+    verifyAssignedSequenceNumber("testBulkLoadSequenceNumber-WithoutSeqNum",
+        new byte[][][] {
+          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+    }, false);
+  }
+  
   private void runTest(String testName, byte[][][] hfileRanges)
   throws Exception {
     Path dir = util.getTestDir(testName);
@@ -124,6 +153,58 @@ public class TestLoadIncrementalHFiles {
     }
   }
 
+  private void verifyAssignedSequenceNumber(String testName,
+      byte[][][] hfileRanges, boolean nonZero) throws Exception {
+    Path dir = util.getTestDir(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+          + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+    }
+
+    util.startMiniCluster();
+    try {
+      HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
+      HTableDescriptor htd = new HTableDescriptor(TABLE);
+      htd.addFamily(new HColumnDescriptor(FAMILY));
+      // Do not worry about splitting the keys
+      admin.createTable(htd);
+
+      HTable table = new HTable(util.getConfiguration(), TABLE);
+      util.waitTableAvailable(TABLE, 30000);
+      
+      // Do a dummy put to increase the hlog sequence number
+      Put put = new Put(Bytes.toBytes("row"));
+      put.add(FAMILY, QUALIFIER, Bytes.toBytes("value"));
+      table.put(put);
+      
+      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
+          util.getConfiguration());
+      loader.doBulkLoad(dir, table);
+
+      // Get the store files
+      List<StoreFile> files = TestStoreFile.getStoreFiles(
+          util.getHBaseCluster().getRegions(TABLE).get(0).getStore(FAMILY));
+      for (StoreFile file: files) {
+        // the sequenceId gets initialized during createReader
+        file.createReader();
+        
+        if (nonZero)
+          assertTrue(file.getMaxSequenceId() > 0);
+        else
+          assertTrue(file.getMaxSequenceId() == -1);
+      }
+    } finally {
+      util.shutdownMiniCluster();
+    }
+  }
+
   @Test
   public void testSplitStoreFile() throws IOException {
     Path dir = util.getTestDir("testSplitHFile");
@@ -188,6 +269,8 @@ public class TestLoadIncrementalHFiles {
         writer.append(kv);
       }
     } finally {
+      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+          Bytes.toBytes(System.currentTimeMillis()));
       writer.close();
     }
   }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Tue Aug 28 20:54:19 2012
@@ -583,7 +583,7 @@ public class TestCompaction extends HBas
     Store store = r.getStore(COLUMN_FAMILY);
 
     List<StoreFile> storeFiles = store.getStorefiles();
-    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, false);
 
     StoreFile.Writer compactedFile = store.compactStores(storeFiles, false, maxId);
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java?rev=1378331&r1=1378330&r2=1378331&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
Tue Aug 28 20:54:19 2012
@@ -609,7 +609,7 @@ public class TestStoreFile extends HBase
   }
 
   public void testFlushTimeComparator() {
-    assertOrdering(StoreFile.Comparators.FLUSH_TIME,
+    assertOrdering(StoreFile.Comparators.SEQ_ID,
         mockStoreFile(true, 1000, -1, "/foo/123"),
         mockStoreFile(true, 1000, -1, "/foo/126"),
         mockStoreFile(true, 2000, -1, "/foo/126"),
@@ -924,4 +924,8 @@ public class TestStoreFile extends HBase
 
     assertEquals(dataBlockEncoderAlgo.getNameInBytes(), value);
   }
+  
+  public static List<StoreFile> getStoreFiles(Store s) {
+    return s.getStorefiles();
+  }
 }
\ No newline at end of file



Mime
View raw message