hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1528592 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/...
Date Wed, 02 Oct 2013 19:06:15 GMT
Author: larsh
Date: Wed Oct  2 19:06:15 2013
New Revision: 1528592

URL: http://svn.apache.org/r1528592
Log:
HBASE-8521 Cells cannot be overwritten with bulk loaded HFiles (Jean-Marc Spaggiari)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
Wed Oct  2 19:06:15 2013
@@ -70,11 +70,17 @@ public class SecureBulkLoadClient {
     }
   }
 
-  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
-                         Token<?> userToken, String bulkToken) throws IOException {
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, Token<?>
userToken,
+      String bulkToken) throws IOException {
+    return bulkLoadHFiles(familyPaths, userToken, bulkToken, false);
+  }
+
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, Token<?>
userToken,
+      String bulkToken, boolean assignSeqNum) throws IOException {
     try {
-      return (Boolean)Methods.call(protocolClazz, proxy, "bulkLoadHFiles",
-          new Class[]{List.class, Token.class, String.class},new Object[]{familyPaths, userToken,
bulkToken});
+      return (Boolean) Methods.call(protocolClazz, proxy, "bulkLoadHFiles", new Class[] {
+          List.class, Token.class, String.class, Boolean.class },
+        new Object[] { familyPaths, userToken, bulkToken, assignSeqNum });
     } catch (Exception e) {
       throw new IOException("Failed to bulkLoadHFiles", e);
     }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed
Oct  2 19:06:15 2013
@@ -392,6 +392,21 @@ public interface HRegionInterface extend
   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, byte[]
regionName)
   throws IOException;
 
+  /**
+   * Atomically bulk load multiple HFiles (say from different column families)
+   * into an open region.
+   * 
+   * @param familyPaths List of (family, hfile path) pairs
+   * @param regionName name of region to load hfiles into
+   * @param assignSeqNum should we assign sequence numbers
+   * @return true if successful, false if failed recoverably
+   * @throws IOException if fails unrecoverably
+   */
+   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, byte[]
regionName,
+       boolean assignSeqNum)
+  throws IOException;
+
+
   // Master methods
 
   /**

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Wed Oct  2 19:06:15 2013
@@ -93,17 +93,17 @@ import com.google.common.util.concurrent
 public class LoadIncrementalHFiles extends Configured implements Tool {
 
   private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
-  private static final int  TABLE_CREATE_MAX_RETRIES = 20;
-  private static final long TABLE_CREATE_SLEEP = 60000;
   static AtomicLong regionCount = new AtomicLong(0);
   private HBaseAdmin hbAdmin;
   private Configuration cfg;
 
   public static String NAME = "completebulkload";
+  public static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
 
   private boolean useSecure;
   private Token<?> userToken;
   private String bulkToken;
+  private final boolean assignSeqIds;
 
   //package private for testing
   LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws Exception {
@@ -112,6 +112,7 @@ public class LoadIncrementalHFiles exten
     this.hbAdmin = new HBaseAdmin(conf);
     //added simple for testing
     this.useSecure = useSecure != null ? useSecure : User.isHBaseSecurityEnabled(conf);
+    this.assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, false);
   }
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
@@ -290,7 +291,7 @@ public class LoadIncrementalHFiles exten
         LOG.error(err);
       }
     }
-    
+
     if (queue != null && !queue.isEmpty()) {
         throw new RuntimeException("Bulk load aborted with some files not yet loaded."
           + "Please check log for more details.");
@@ -360,7 +361,7 @@ public class LoadIncrementalHFiles exten
     Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
     while (!queue.isEmpty()) {
       final LoadQueueItem item = queue.remove();
-      
+
       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>()
{
         public List<LoadQueueItem> call() throws Exception {
           List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
@@ -524,11 +525,11 @@ public class LoadIncrementalHFiles exten
               + Bytes.toStringBinary(row));
           byte[] regionName = location.getRegionInfo().getRegionName();
           if(!useSecure) {
-            success = server.bulkLoadHFiles(famPaths, regionName);
+             success = server.bulkLoadHFiles(famPaths, regionName, assignSeqIds);
           } else {
             HTable table = new HTable(conn.getConfiguration(), tableName);
             secureClient = new SecureBulkLoadClient(table, location.getRegionInfo().getStartKey());
-            success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken);
+            success = secureClient.bulkLoadHFiles(famPaths, userToken, bulkToken, assignSeqIds);
           }
           return success;
         } finally {
@@ -653,7 +654,7 @@ public class LoadIncrementalHFiles exten
   private boolean doesTableExist(String tableName) throws Exception {
     return hbAdmin.tableExists(tableName);
   }
-  
+
   /*
    * Infers region boundaries for a new table.
    * Parameter:
@@ -671,16 +672,15 @@ public class LoadIncrementalHFiles exten
     int runningValue = 0;
     byte[] currStartKey = null;
     boolean firstBoundary = true;
-    
+
     for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
       if (runningValue == 0) currStartKey = item.getKey();
       runningValue += item.getValue();
       if (runningValue == 0) {
         if (!firstBoundary) keysArray.add(currStartKey);
         firstBoundary = false;
-      } 
+      }
     }
-    
     return keysArray.toArray(new byte[0][0]);
   }
  
@@ -709,7 +709,7 @@ public class LoadIncrementalHFiles exten
     // Build a set of keys
     byte[][] keys = null;
     TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-    
+
     for (FileStatus stat : familyDirStatuses) {
       if (!stat.isDir()) {
         LOG.warn("Skipping non-directory " + stat.getPath());
@@ -719,10 +719,10 @@ public class LoadIncrementalHFiles exten
       // Skip _logs, etc
       if (familyDir.getName().startsWith("_")) continue;
       byte[] family = familyDir.getName().getBytes();
-     
+
       hcd = new HColumnDescriptor(family);
       htd.addFamily(hcd);
-      
+
       Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
       for (Path hfile : hfiles) {
         if (hfile.getName().startsWith("_")) continue;
@@ -742,7 +742,7 @@ public class LoadIncrementalHFiles exten
           LOG.info("Trying to figure out region boundaries hfile=" + hfile +
             " first=" + Bytes.toStringBinary(first) +
             " last="  + Bytes.toStringBinary(last));
-          
+
           // To eventually infer start key-end key boundaries
           Integer value = map.containsKey(first)?(Integer)map.get(first):0;
           map.put(first, value+1);
@@ -754,7 +754,7 @@ public class LoadIncrementalHFiles exten
         }
       }
     }
-    
+
     keys = LoadIncrementalHFiles.inferBoundaries(map);
     this.hbAdmin.createTable(htd,keys);
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed
Oct  2 19:06:15 2013
@@ -494,7 +494,7 @@ public class HRegion implements HeapSize
     // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
     this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
         1 * 1000) <= 0;
-    
+
     if (rsServices != null) {
       this.rsAccounting = this.rsServices.getRegionServerAccounting();
       // don't initialize coprocessors if not running within a regionserver
@@ -608,11 +608,13 @@ public class HRegion implements HeapSize
           Store store = future.get();
 
           this.stores.put(store.getColumnFamilyName().getBytes(), store);
-          long storeSeqId = store.getMaxSequenceId();
-          maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
-              storeSeqId);
-          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
-            maxSeqId = storeSeqId;
+          // Do not include bulk loaded files when determining seqIdForReplay
+          long storeSeqIdForReplay = store.getMaxSequenceId(false);
+          maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), storeSeqIdForReplay);
+          // Include bulk loaded files when determining seqIdForAssignment
+          long storeSeqIdForAssignment = store.getMaxSequenceId(true);
+          if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
+            maxSeqId = storeSeqIdForAssignment;
           }
           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
           if (maxStoreMemstoreTS > maxMemstoreTS) {
@@ -2244,7 +2246,7 @@ public class HRegion implements HeapSize
     /** Keep track of the locks we hold so we can release them in finally clause */
     List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
     Set<HashedBytes> rowsAlreadyLocked = Sets.newHashSet();
-      
+
     // reference family maps directly so coprocessors can mutate them if desired
     Map<byte[],List<KeyValue>>[] familyMaps = new Map[batchOp.operations.length];
     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
@@ -3619,7 +3621,20 @@ public class HRegion implements HeapSize
    * @throws IOException if failed unrecoverably.
    */
   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths) throws
IOException {
-    return bulkLoadHFiles(familyPaths, null);
+    return bulkLoadHFiles(familyPaths, false);
+  }
+
+  /**
+   * Attempts to atomically load a group of hfiles. This is critical for loading rows with
multiple
+   * column families atomically.
+   * @param familyPaths List of Pair<byte[] column family, String hfilePath> * @param
assignSeqNum
+   *          should we assign sequence numbers
+   * @return true if successful, false if failed recoverably
+   * @throws IOException if failed unrecoverably.
+   */
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean
assignSeqId)
+      throws IOException {
+    return bulkLoadHFiles(familyPaths, null, assignSeqId);
   }
 
   /**
@@ -3634,6 +3649,20 @@ public class HRegion implements HeapSize
    */
   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
       BulkLoadListener bulkLoadListener) throws IOException {
+    return bulkLoadHFiles(familyPaths, bulkLoadListener, false);
+  }
+
+  /**
+   * Attempts to atomically load a group of hfiles. This is critical for loading rows with
multiple
+   * column families atomically.
+   * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+   * @param bulkLoadListener Internal hooks enabling massaging/preparation of a file about
to be
+   *          bulk loaded * @param assignSeqNum should we assign sequence numbers
+   * @return true if successful, false if failed recoverably
+   * @throws IOException if failed unrecoverably.
+   */
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+      BulkLoadListener bulkLoadListener, boolean assignSeqId) throws IOException {
     Preconditions.checkNotNull(familyPaths);
     // we need writeLock for multi-family bulk load
     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
@@ -3642,7 +3671,7 @@ public class HRegion implements HeapSize
       this.opMetrics.setWriteRequestCountMetrics( this.writeRequestsCount.get());
 
       // There possibly was a split that happend between when the split keys
-      // were gathered and before the HReiogn's write lock was taken.  We need
+      // were gathered and before the HRegion's write lock was taken.  We need
       // to validate the HFile region before attempting to bulk load all of them
       List<IOException> ioes = new ArrayList<IOException>();
       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[],
String>>();
@@ -3697,7 +3726,7 @@ public class HRegion implements HeapSize
           if(bulkLoadListener != null) {
             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
           }
-          store.bulkLoadHFile(finalPath);
+          store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1);
           if(bulkLoadListener != null) {
             bulkLoadListener.doneBulkLoad(familyName, path);
           }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Wed Oct  2 19:06:15 2013
@@ -2919,6 +2919,17 @@ public class HRegionServer implements HR
   @Override
   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
       byte[] regionName) throws IOException {
+    return bulkLoadHFiles(familyPaths, regionName, false);
+  }
+
+  /**
+   * Atomically bulk load several HFiles into an open region
+   * @return true if successful, false is failed but recoverably (no action)
+   * @throws IOException if failed unrecoverably
+   */
+  @Override
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+      byte[] regionName, boolean assignSeqNum) throws IOException {
     checkOpen();
     HRegion region = getRegion(regionName);
     boolean bypass = false;
@@ -2927,7 +2938,7 @@ public class HRegionServer implements HR
     }
     boolean loaded = false;
     if (!bypass) {
-      loaded = region.bulkLoadHFiles(familyPaths);
+      loaded = region.bulkLoadHFiles(familyPaths, assignSeqNum);
     }
     if (region.getCoprocessorHost() != null) {
       loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed
Oct  2 19:06:15 2013
@@ -347,8 +347,8 @@ public class Store extends SchemaConfigu
   /**
    * @return The maximum sequence id in all store files.
    */
-  long getMaxSequenceId() {
-    return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
+  long getMaxSequenceId(boolean includeBulkFiles) {
+    return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
   }
 
   /**
@@ -628,7 +628,7 @@ public class Store extends SchemaConfigu
    * ranges of values in the HFile fit within the stores assigned region.
    * (assertBulkLoadHFileOk checks this)
    */
-  void bulkLoadHFile(String srcPathStr) throws IOException {
+  public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
     Path srcPath = new Path(srcPathStr);
 
     // Move the file if it's on another filesystem
@@ -647,7 +647,8 @@ public class Store extends SchemaConfigu
       srcPath = tmpPath;
     }
 
-    Path dstPath = StoreFile.getRandomFilename(fs, homedir);
+    Path dstPath =
+        StoreFile.getRandomFilename(fs, homedir, (seqNum == -1) ? null : "_SeqId_" + seqNum
+ "_");
     LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
     StoreFile.rename(fs, srcPath, dstPath);
 
@@ -1138,7 +1139,7 @@ public class Store extends SchemaConfigu
     }
 
     // Max-sequenceID is the last key in the files we're compacting
-    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
@@ -1206,10 +1207,10 @@ public class Store extends SchemaConfigu
         }
 
         filesToCompact = filesToCompact.subList(count - N, count);
-        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
         isMajor = (filesToCompact.size() == storefiles.size());
         filesCompacting.addAll(filesToCompact);
-        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+        Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
       }
     } finally {
       this.lock.readLock().unlock();
@@ -1426,7 +1427,7 @@ public class Store extends SchemaConfigu
               filesToCompact, filesCompacting);
         }
         filesCompacting.addAll(filesToCompact.getFilesToCompact());
-        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+        Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
 
         // major compaction iff all StoreFiles are included
         boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
@@ -1896,7 +1897,7 @@ public class Store extends SchemaConfigu
   }
 
   public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
-    Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
+    Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
     ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
     return newList;
   }

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Wed Oct  2 19:06:15 2013
@@ -439,10 +439,11 @@ public class StoreFile extends SchemaCon
    * @return 0 if no non-bulk-load files are provided or, this is Store that
    * does not yet have any store files.
    */
-  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
+  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs, 
+      boolean includeBulkLoadedFiles) {
     long max = 0;
     for (StoreFile sf : sfs) {
-      if (!sf.isBulkLoadResult()) {
+      if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
         max = Math.max(max, sf.getMaxSequenceId());
       }
     }
@@ -582,6 +583,24 @@ public class StoreFile extends SchemaCon
         }
       }
     }
+
+    if (isBulkLoadResult()) {
+      // generate the sequenceId from the fileName
+      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
+      String fileName = this.path.getName();
+      int startPos = fileName.indexOf("SeqId_");
+      if (startPos != -1) {
+        this.sequenceid =
+            Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos
+ 6)));
+        // Handle reference files as done above.
+        if (isReference()) {
+          if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
+            this.sequenceid += 1;
+          }
+        }
+      }
+    }
+
     this.reader.setSequenceID(this.sequenceid);
 
     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@@ -1859,29 +1878,35 @@ public class StoreFile extends SchemaCon
      * Comparator that compares based on the flush time of
      * the StoreFiles. All bulk loads are placed before all non-
      * bulk loads, and then all files are sorted by sequence ID.
-     * If there are ties, the path name is used as a tie-breaker.
+     * Comparator that compares based on the Sequence Ids of the
+     * the StoreFiles. Bulk loads that did not request a seq ID
+     * are given a seq id of -1; thus, they are placed before all non-
+     * bulk loads, and bulk loads with sequence Id. Among these files,
+     * the bulkLoadTime is used to determine the ordering.
+     * If there are ties, the path name is used as a tie-breaker. 
      */
-    static final Comparator<StoreFile> FLUSH_TIME =
+    static final Comparator<StoreFile> SEQ_ID =
       Ordering.compound(ImmutableList.of(
-          Ordering.natural().onResultOf(new GetBulkTime()),
           Ordering.natural().onResultOf(new GetSeqId()),
+          Ordering.natural().onResultOf(new GetBulkTime()),
           Ordering.natural().onResultOf(new GetPathName())
       ));
 
-    private static class GetBulkTime implements Function<StoreFile, Long> {
+    private static class GetSeqId implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
-        if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
-        return sf.getBulkLoadTimestamp();
+        return sf.getMaxSequenceId();
       }
     }
-    private static class GetSeqId implements Function<StoreFile, Long> {
+
+    private static class GetBulkTime implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
-        if (sf.isBulkLoadResult()) return -1L;
-        return sf.getMaxSequenceId();
+        if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
+        return sf.getBulkLoadTimestamp();
       }
     }
+
     private static class GetPathName implements Function<StoreFile, String> {
       @Override
       public String apply(StoreFile sf) {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed
Oct  2 19:06:15 2013
@@ -1514,7 +1514,7 @@ public class HLog implements Syncable {
   /**
    * Obtain a log sequence number.
    */
-  private long obtainSeqNum() {
+  public long obtainSeqNum() {
     return this.logSeqNum.incrementAndGet();
   }
 

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Wed Oct  2 19:06:15 2013
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,10 +33,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.*;
@@ -157,6 +160,53 @@ public class TestLoadIncrementalHFiles {
     assertEquals(expectedRows, util.countRows(table));
   }
 
+  private void
+      verifyAssignedSequenceNumber(String testName, byte[][][] hfileRanges, boolean nonZero)
+          throws Exception {
+    Path dir = util.getDataTestDir(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + hfileIdx++),
FAMILY,
+        QUALIFIER, from, to, 1000);
+    }
+
+    final byte[] TABLE = Bytes.toBytes("mytable_" + testName);
+
+    HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
+    HTableDescriptor htd = new HTableDescriptor(TABLE);
+    HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
+    htd.addFamily(familyDesc);
+    admin.createTable(htd, SPLIT_KEYS);
+
+    HTable table = new HTable(util.getConfiguration(), TABLE);
+    util.waitTableAvailable(TABLE, 30000);
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+
+    // Do a dummy put to increase the hlog sequence number
+    Put put = new Put(Bytes.toBytes("row"));
+    put.add(FAMILY, QUALIFIER, Bytes.toBytes("value"));
+    table.put(put);
+
+    loader.doBulkLoad(dir, table);
+
+    // Get the store files
+    List<StoreFile> files =
+        util.getHBaseCluster().getRegions(TABLE).get(0).getStore(FAMILY).getStorefiles();
+    for (StoreFile file : files) {
+      // the sequenceId gets initialized during createReader
+      file.createReader();
+
+      if (nonZero) assertTrue(file.getMaxSequenceId() > 0);
+      else assertTrue(file.getMaxSequenceId() == -1);
+    }
+  }
+
   /**
    * Test loading into a column family that does not exist.
    */

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Wed Oct  2 19:06:15 2013
@@ -594,7 +594,7 @@ public class TestCompaction extends HBas
     Store store = r.getStore(COLUMN_FAMILY);
 
     List<StoreFile> storeFiles = store.getStorefiles();
-    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, true);
     Compactor tool = new Compactor(this.conf);
 
     StoreFile.Writer compactedFile = tool.compactForTesting(store, this.conf, storeFiles,
false,

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Wed Oct  2 19:06:15 2013
@@ -145,7 +145,7 @@ public class TestHRegionServerBulkLoad {
           LOG.debug("Going to connect to server " + location + " for row "
               + Bytes.toStringBinary(row));
           byte[] regionName = location.getRegionInfo().getRegionName();
-          server.bulkLoadHFiles(famPaths, regionName);
+          server.bulkLoadHFiles(famPaths, regionName, true);
           return null;
         }
       }.withRetries();

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
Wed Oct  2 19:06:15 2013
@@ -778,8 +778,8 @@ public class TestStoreFile extends HBase
     fs.delete(f, true);
   }
 
-  public void testFlushTimeComparator() {
-    assertOrdering(StoreFile.Comparators.FLUSH_TIME,
+  public void testSeqIdComparator() {
+    assertOrdering(StoreFile.Comparators.SEQ_ID,
         mockStoreFile(true, 1000, -1, "/foo/123"),
         mockStoreFile(true, 1000, -1, "/foo/126"),
         mockStoreFile(true, 2000, -1, "/foo/126"),
@@ -810,13 +810,7 @@ public class TestStoreFile extends HBase
     StoreFile mock = Mockito.mock(StoreFile.class);
     Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
     Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
-    if (bulkLoad) {
-      // Bulk load files will throw if you ask for their sequence ID
-      Mockito.doThrow(new IllegalAccessError("bulk load"))
-        .when(mock).getMaxSequenceId();
-    } else {
-      Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
-    }
+    Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
     Mockito.doReturn(new Path(path)).when(mock).getPath();
     String name = "mock storefile, bulkLoad=" + bulkLoad +
       " bulkTimestamp=" + bulkTimestamp +

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Wed Oct  2 19:06:15 2013
@@ -326,7 +326,7 @@ public class TestWALReplay {
     writer.close();
     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
     hfs.add(Pair.newPair(family, f.toString()));
-    region.bulkLoadHFiles(hfs);
+    region.bulkLoadHFiles(hfs, true);
     // Add an edit so something in the WAL
     region.put((new Put(row)).add(family, family, family));
     wal.sync();



Mime
View raw message