hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1382351 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apache/hadoop/hbase/protobuf/generated/ main/java/org/apache/hadoop/hbase/regionserver/ main...
Date Sat, 08 Sep 2012 20:13:22 GMT
Author: tedyu
Date: Sat Sep  8 20:13:21 2012
New Revision: 1382351

URL: http://svn.apache.org/viewvc?rev=1382351&view=rev
Log:
HBASE-6630 Port HBASE-6590 to trunk 0.94 : Assign sequence number to bulk loaded files (Amitanand)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/main/protobuf/Client.proto
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Sat Sep  8 20:13:21 2012
@@ -97,11 +97,14 @@ public class LoadIncrementalHFiles exten
   private Configuration cfg;
 
   public static String NAME = "completebulkload";
+  private static String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+  private boolean assignSeqIds;
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
     super(conf);
     this.cfg = conf;
     this.hbAdmin = new HBaseAdmin(conf);
+    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
   }
 
   private void usage() {
@@ -482,7 +485,8 @@ public class LoadIncrementalHFiles exten
         LOG.debug("Going to connect to server " + location + " for row "
             + Bytes.toStringBinary(row));
         byte[] regionName = location.getRegionInfo().getRegionName();
-        return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName);
+        return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName,
+            assignSeqIds);
       }
     };
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
Sat Sep  8 20:13:21 2012
@@ -1265,14 +1265,15 @@ public final class ProtobufUtil {
    * @param client
    * @param familyPaths
    * @param regionName
+   * @param assignSeqNum
    * @return true if all are loaded
    * @throws IOException
    */
   public static boolean bulkLoadHFile(final ClientProtocol client,
       final List<Pair<byte[], String>> familyPaths,
-      final byte[] regionName) throws IOException {
+      final byte[] regionName, boolean assignSeqNum) throws IOException {
     BulkLoadHFileRequest request =
-      RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName);
+      RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum);
     try {
       BulkLoadHFileResponse response =
         client.bulkLoadHFile(null, request);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
Sat Sep  8 20:13:21 2012
@@ -454,10 +454,12 @@ public final class RequestConverter {
    *
    * @param familyPaths
    * @param regionName
+   * @param assignSeqNum
    * @return a bulk load request
    */
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
-      final List<Pair<byte[], String>> familyPaths, final byte[] regionName)
{
+      final List<Pair<byte[], String>> familyPaths,
+      final byte[] regionName, boolean assignSeqNum) {
     BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
@@ -468,6 +470,7 @@ public final class RequestConverter {
       familyPathBuilder.setPath(familyPath.getSecond());
       builder.addFamilyPath(familyPathBuilder.build());
     }
+    builder.setAssignSeqNum(assignSeqNum);
     return builder.build();
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
Sat Sep  8 20:13:21 2012
@@ -13967,6 +13967,10 @@ public final class ClientProtos {
         getFamilyPathOrBuilderList();
     org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPathOrBuilder
getFamilyPathOrBuilder(
         int index);
+
+    // optional bool assignSeqNum = 3;
+    boolean hasAssignSeqNum();
+    boolean getAssignSeqNum();
   }
   public static final class BulkLoadHFileRequest extends
       com.google.protobuf.GeneratedMessage
@@ -14524,9 +14528,20 @@ public final class ClientProtos {
       return familyPath_.get(index);
     }
     
+    // optional bool assignSeqNum = 3;
+    public static final int ASSIGNSEQNUM_FIELD_NUMBER = 3;
+    private boolean assignSeqNum_;
+    public boolean hasAssignSeqNum() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public boolean getAssignSeqNum() {
+      return assignSeqNum_;
+    }
+
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       familyPath_ = java.util.Collections.emptyList();
+      assignSeqNum_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -14560,6 +14575,9 @@ public final class ClientProtos {
       for (int i = 0; i < familyPath_.size(); i++) {
         output.writeMessage(2, familyPath_.get(i));
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBool(3, assignSeqNum_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -14577,6 +14595,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(2, familyPath_.get(i));
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(3, assignSeqNum_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -14607,6 +14629,11 @@ public final class ClientProtos {
       }
       result = result && getFamilyPathList()
           .equals(other.getFamilyPathList());
+      result = result && (hasAssignSeqNum() == other.hasAssignSeqNum());
+      if (hasAssignSeqNum()) {
+        result = result && (getAssignSeqNum()
+            == other.getAssignSeqNum());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -14624,6 +14651,10 @@ public final class ClientProtos {
         hash = (37 * hash) + FAMILYPATH_FIELD_NUMBER;
         hash = (53 * hash) + getFamilyPathList().hashCode();
       }
+      if (hasAssignSeqNum()) {
+        hash = (37 * hash) + ASSIGNSEQNUM_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getAssignSeqNum());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -14754,6 +14785,8 @@ public final class ClientProtos {
         } else {
           familyPathBuilder_.clear();
         }
+        assignSeqNum_ = false;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -14809,6 +14842,10 @@ public final class ClientProtos {
         } else {
           result.familyPath_ = familyPathBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.assignSeqNum_ = assignSeqNum_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -14854,6 +14891,9 @@ public final class ClientProtos {
             }
           }
         }
+        if (other.hasAssignSeqNum()) {
+          setAssignSeqNum(other.getAssignSeqNum());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -14914,6 +14954,11 @@ public final class ClientProtos {
               addFamilyPath(subBuilder.buildPartial());
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              assignSeqNum_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -15196,6 +15241,27 @@ public final class ClientProtos {
         return familyPathBuilder_;
       }
       
+      // optional bool assignSeqNum = 3;
+      private boolean assignSeqNum_ ;
+      public boolean hasAssignSeqNum() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public boolean getAssignSeqNum() {
+        return assignSeqNum_;
+      }
+      public Builder setAssignSeqNum(boolean value) {
+        bitField0_ |= 0x00000004;
+        assignSeqNum_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearAssignSeqNum() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        assignSeqNum_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:BulkLoadHFileRequest)
     }
     
@@ -21617,38 +21683,39 @@ public final class ClientProtos {
       "\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006lockId\030" +
       "\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowRequest\022 " +
       "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006lock" +
-      "Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\232\001\n\024BulkLo" +
+      "Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\260\001\n\024BulkLo" +
       "adHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
       "pecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkLoadH" +
-      "FileRequest.FamilyPath\032*\n\nFamilyPath\022\016\n\006" +
-      "family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF" +
-      "ileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003"
+
-      "row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nmetho",
-      "dName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.NameStri" +
-      "ngPair\022!\n\tparameter\030\005 \003(\0132\016.NameBytesPai" +
-      "r\"O\n\026ExecCoprocessorRequest\022 \n\006region\030\001 " +
-      "\002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132\005.Ex" +
-      "ec\"8\n\027ExecCoprocessorResponse\022\035\n\005value\030\001" +
-      " \002(\0132\016.NameBytesPair\"N\n\013MultiAction\022\027\n\006m" +
-      "utate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\022"
+
-      "\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult\022\035\n\005" +
-      "value\030\001 \001(\0132\016.NameBytesPair\022!\n\texception" +
-      "\030\002 \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 ",
-      "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006acti" +
-      "on\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\"." +
-      "\n\rMultiResponse\022\035\n\006result\030\001 \003(\0132\r.Action" +
-      "Result2\221\003\n\rClientService\022 \n\003get\022\013.GetReq" +
-      "uest\032\014.GetResponse\022)\n\006mutate\022\016.MutateReq" +
-      "uest\032\017.MutateResponse\022#\n\004scan\022\014.ScanRequ" +
-      "est\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRowR" +
-      "equest\032\020.LockRowResponse\0222\n\tunlockRow\022\021." +
-      "UnlockRowRequest\032\022.UnlockRowResponse\022>\n\r" +
-      "bulkLoadHFile\022\025.BulkLoadHFileRequest\032\026.B",
-      "ulkLoadHFileResponse\022D\n\017execCoprocessor\022" +
-      "\027.ExecCoprocessorRequest\032\030.ExecCoprocess" +
-      "orResponse\022&\n\005multi\022\r.MultiRequest\032\016.Mul" +
-      "tiResponseBB\n*org.apache.hadoop.hbase.pr" +
-      "otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
+      "FileRequest.FamilyPath\022\024\n\014assignSeqNum\030\003" +
+      " \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa"
+
+      "th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" +
+      "ded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003row\030\001 \002(\014\022\024\n\014proto",
+      "colName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022!\n\010pro" +
+      "perty\030\004 \003(\0132\017.NameStringPair\022!\n\tparamete" +
+      "r\030\005 \003(\0132\016.NameBytesPair\"O\n\026ExecCoprocess" +
+      "orRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" +
+      "ier\022\023\n\004call\030\002 \002(\0132\005.Exec\"8\n\027ExecCoproces" +
+      "sorResponse\022\035\n\005value\030\001 \002(\0132\016.NameBytesPa" +
+      "ir\"N\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007.Muta" +
+      "te\022\021\n\003get\030\002 \001(\0132\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Ex"
+
+      "ec\"P\n\014ActionResult\022\035\n\005value\030\001 \001(\0132\016.Name" +
+      "BytesPair\022!\n\texception\030\002 \001(\0132\016.NameBytes",
+      "Pair\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.R" +
+      "egionSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiAc" +
+      "tion\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n" +
+      "\006result\030\001 \003(\0132\r.ActionResult2\221\003\n\rClientS" +
+      "ervice\022 \n\003get\022\013.GetRequest\032\014.GetResponse" +
+      "\022)\n\006mutate\022\016.MutateRequest\032\017.MutateRespo" +
+      "nse\022#\n\004scan\022\014.ScanRequest\032\r.ScanResponse" +
+      "\022,\n\007lockRow\022\017.LockRowRequest\032\020.LockRowRe" +
+      "sponse\0222\n\tunlockRow\022\021.UnlockRowRequest\032\022" +
+      ".UnlockRowResponse\022>\n\rbulkLoadHFile\022\025.Bu",
+      "lkLoadHFileRequest\032\026.BulkLoadHFileRespon" +
+      "se\022D\n\017execCoprocessor\022\027.ExecCoprocessorR" +
+      "equest\032\030.ExecCoprocessorResponse\022&\n\005mult" +
+      "i\022\r.MultiRequest\032\016.MultiResponseBB\n*org." +
+      "apache.hadoop.hbase.protobuf.generatedB\014" +
+      "ClientProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -21804,7 +21871,7 @@ public final class ClientProtos {
           internal_static_BulkLoadHFileRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_BulkLoadHFileRequest_descriptor,
-              new java.lang.String[] { "Region", "FamilyPath", },
+              new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", },
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.class,
               org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.Builder.class);
           internal_static_BulkLoadHFileRequest_FamilyPath_descriptor =

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Sat Sep  8 20:13:21 2012
@@ -564,11 +564,14 @@ public class HRegion implements HeapSize
           HStore store = future.get();
 
           this.stores.put(store.getColumnFamilyName().getBytes(), store);
-          long storeSeqId = store.getMaxSequenceId();
+          // Do not include bulk loaded files when determining seqIdForReplay
+          long storeSeqIdForReplay = store.getMaxSequenceId(false);
           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
-              storeSeqId);
-          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
-            maxSeqId = storeSeqId;
+              storeSeqIdForReplay);
+          // Include bulk loaded files when determining seqIdForAssignment
+          long storeSeqIdForAssignment = store.getMaxSequenceId(true);
+          if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
+            maxSeqId = storeSeqIdForAssignment;
           }
           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
           if (maxStoreMemstoreTS > maxMemstoreTS) {
@@ -3314,11 +3317,12 @@ public class HRegion implements HeapSize
    * rows with multiple column families atomically.
    *
    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+   * @param assignSeqId
    * @return true if successful, false if failed recoverably
    * @throws IOException if failed unrecoverably.
    */
-  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths)
-  throws IOException {
+  public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+      boolean assignSeqId) throws IOException {
     Preconditions.checkNotNull(familyPaths);
     // we need writeLock for multi-family bulk load
     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
@@ -3378,7 +3382,7 @@ public class HRegion implements HeapSize
         String path = p.getSecond();
         Store store = getStore(familyName);
         try {
-          store.bulkLoadHFile(path);
+          store.bulkLoadHFile(path, assignSeqId ? this.log.obtainSeqNum() : -1);
         } catch (IOException ioe) {
           // A failure here can cause an atomicity violation that we currently
           // cannot recover from since it is likely a failed HDFS operation.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Sat Sep  8 20:13:21 2012
@@ -3284,7 +3284,7 @@ public class  HRegionServer implements C
       }
       boolean loaded = false;
       if (!bypass) {
-        loaded = region.bulkLoadHFiles(familyPaths);
+        loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
       }
       if (region.getCoprocessorHost() != null) {
         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
Sat Sep  8 20:13:21 2012
@@ -311,8 +311,8 @@ public class HStore extends SchemaConfig
   /**
    * @return The maximum sequence id in all store files.
    */
-  long getMaxSequenceId() {
-    return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
+  long getMaxSequenceId(boolean includeBulkFiles) {
+    return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
   }
 
   @Override
@@ -532,7 +532,7 @@ public class HStore extends SchemaConfig
   }
 
   @Override
-  public void bulkLoadHFile(String srcPathStr) throws IOException {
+  public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
     Path srcPath = new Path(srcPathStr);
 
     // Copy the file if it's on another filesystem
@@ -547,7 +547,8 @@ public class HStore extends SchemaConfig
       srcPath = tmpPath;
     }
 
-    Path dstPath = StoreFile.getRandomFilename(fs, homedir);
+    Path dstPath = StoreFile.getRandomFilename(fs, homedir,
+        (seqNum == -1) ? null : "_SeqId_" + seqNum + "_");
     LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
     StoreFile.rename(fs, srcPath, dstPath);
 
@@ -990,7 +991,7 @@ public class HStore extends SchemaConfig
     }
 
     // Max-sequenceID is the last key in the files we're compacting
-    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
 
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
@@ -1057,10 +1058,10 @@ public class HStore extends SchemaConfig
         }
 
         filesToCompact = filesToCompact.subList(count - N, count);
-        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
         isMajor = (filesToCompact.size() == storefiles.size());
         filesCompacting.addAll(filesToCompact);
-        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+        Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
       }
     } finally {
       this.lock.readLock().unlock();
@@ -1275,7 +1276,7 @@ public class HStore extends SchemaConfig
               filesToCompact, filesCompacting);
         }
         filesCompacting.addAll(filesToCompact.getFilesToCompact());
-        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+        Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
 
         // major compaction iff all StoreFiles are included
         boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
@@ -1616,7 +1617,7 @@ public class HStore extends SchemaConfig
   }
 
   public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
-    Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
+    Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
     ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
     return newList;
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Sat Sep  8 20:13:21 2012
@@ -191,8 +191,11 @@ public interface Store extends SchemaAwa
   /**
    * This method should only be called from HRegion. It is assumed that the ranges of values
in the
    * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
+   * 
+   * @param srcPathStr
+   * @param sequenceId sequence Id associated with the HFile
    */
-  public void bulkLoadHFile(String srcPathStr) throws IOException;
+  public void bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
 
   // General accessors into the state of the store
   // TODO abstract some of this out into a metrics class

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Sat Sep  8 20:13:21 2012
@@ -201,7 +201,7 @@ public class StoreFile extends SchemaCon
    * this files id.  Group 2 the referenced region name, etc.
    */
   private static final Pattern REF_NAME_PARSER =
-    Pattern.compile("^([0-9a-f]+)(?:\\.(.+))?$");
+    Pattern.compile("^([0-9a-f]+(?:_SeqId_[0-9]+_)?)(?:\\.(.+))?$");
 
   // StoreFile.Reader
   private volatile Reader reader;
@@ -390,13 +390,16 @@ public class StoreFile extends SchemaCon
    * the given list. Store files that were created by a mapreduce
    * bulk load are ignored, as they do not correspond to any edit
    * log items.
+   * @param sfs
+   * @param includeBulkLoadedFiles
    * @return 0 if no non-bulk-load files are provided or, this is Store that
    * does not yet have any store files.
    */
-  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
+  public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
+      boolean includeBulkLoadedFiles) {
     long max = 0;
     for (StoreFile sf : sfs) {
-      if (!sf.isBulkLoadResult()) {
+      if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
         max = Math.max(max, sf.getMaxSequenceId());
       }
     }
@@ -539,6 +542,24 @@ public class StoreFile extends SchemaCon
         }
       }
     }
+
+    if (isBulkLoadResult()){
+      // generate the sequenceId from the fileName
+      // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
+      String fileName = this.path.getName();
+      int startPos = fileName.indexOf("SeqId_");
+      if (startPos != -1) {
+        this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
+            fileName.indexOf('_', startPos + 6)));
+        // Handle reference files as done above.
+        if (isReference()) {
+          if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
+            this.sequenceid += 1;
+          }
+        }
+      }
+    }
+
     this.reader.setSequenceID(this.sequenceid);
 
     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@@ -1718,32 +1739,35 @@ public class StoreFile extends SchemaCon
    */
   abstract static class Comparators {
     /**
-     * Comparator that compares based on the flush time of
-     * the StoreFiles. All bulk loads are placed before all non-
-     * bulk loads, and then all files are sorted by sequence ID.
+     * Comparator that compares based on the Sequence Ids of the
+     * the StoreFiles. Bulk loads that did not request a seq ID
+     * are given a seq id of -1; thus, they are placed before all non-
+     * bulk loads, and bulk loads with sequence Id. Among these files,
+     * the bulkLoadTime is used to determine the ordering.
      * If there are ties, the path name is used as a tie-breaker.
      */
-    static final Comparator<StoreFile> FLUSH_TIME =
+    static final Comparator<StoreFile> SEQ_ID =
       Ordering.compound(ImmutableList.of(
-          Ordering.natural().onResultOf(new GetBulkTime()),
           Ordering.natural().onResultOf(new GetSeqId()),
+          Ordering.natural().onResultOf(new GetBulkTime()),
           Ordering.natural().onResultOf(new GetPathName())
       ));
 
-    private static class GetBulkTime implements Function<StoreFile, Long> {
+    private static class GetSeqId implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
-        if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
-        return sf.getBulkLoadTimestamp();
+        return sf.getMaxSequenceId();
       }
     }
-    private static class GetSeqId implements Function<StoreFile, Long> {
+
+    private static class GetBulkTime implements Function<StoreFile, Long> {
       @Override
       public Long apply(StoreFile sf) {
-        if (sf.isBulkLoadResult()) return -1L;
-        return sf.getMaxSequenceId();
+        if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
+        return sf.getBulkLoadTimestamp();
       }
     }
+
     private static class GetPathName implements Function<StoreFile, String> {
       @Override
       public String apply(StoreFile sf) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Sat Sep  8 20:13:21 2012
@@ -1499,7 +1499,7 @@ public class HLog implements Syncable {
   /**
    * Obtain a log sequence number.
    */
-  private long obtainSeqNum() {
+  public long obtainSeqNum() {
     return this.logSeqNum.incrementAndGet();
   }
 

Modified: hbase/trunk/hbase-server/src/main/protobuf/Client.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/protobuf/Client.proto?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/protobuf/Client.proto (original)
+++ hbase/trunk/hbase-server/src/main/protobuf/Client.proto Sat Sep  8 20:13:21 2012
@@ -247,6 +247,7 @@ message UnlockRowResponse {
 message BulkLoadHFileRequest {
   required RegionSpecifier region = 1;
   repeated FamilyPath familyPath = 2;
+  optional bool assignSeqNum = 3;
 
   message FamilyPath {
     required bytes family = 1;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Sat Sep  8 20:13:21 2012
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.util.TreeMap;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,10 +32,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.*;
@@ -155,6 +158,55 @@ public class TestLoadIncrementalHFiles {
     assertEquals(expectedRows, util.countRows(table));
   }
 
+  private void verifyAssignedSequenceNumber(String testName,
+      byte[][][] hfileRanges, boolean nonZero) throws Exception {
+    Path dir = util.getDataTestDir(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+          + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
+    }
+
+    final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
+
+    HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
+    HTableDescriptor htd = new HTableDescriptor(TABLE);
+    HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
+    htd.addFamily(familyDesc);
+    admin.createTable(htd, SPLIT_KEYS);
+
+    HTable table = new HTable(util.getConfiguration(), TABLE);
+    util.waitTableAvailable(TABLE, 30000);
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
+      util.getConfiguration());
+
+    // Do a dummy put to increase the hlog sequence number
+    Put put = new Put(Bytes.toBytes("row"));
+    put.add(FAMILY, QUALIFIER, Bytes.toBytes("value"));
+    table.put(put);
+
+    loader.doBulkLoad(dir, table);
+
+    // Get the store files
+    List<StoreFile> files = util.getHBaseCluster().
+        getRegions(TABLE).get(0).getStore(FAMILY).getStorefiles();
+    for (StoreFile file: files) {
+      // the sequenceId gets initialized during createReader
+      file.createReader();
+
+      if (nonZero)
+        assertTrue(file.getMaxSequenceId() > 0);
+      else
+        assertTrue(file.getMaxSequenceId() == -1);
+    }
+  }
+
   @Test
   public void testSplitStoreFile() throws IOException {
     Path dir = util.getDataTestDir("testSplitHFile");
@@ -220,6 +272,8 @@ public class TestLoadIncrementalHFiles {
         writer.append(kv);
       }
     } finally {
+      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+          Bytes.toBytes(System.currentTimeMillis()));
       writer.close();
     }
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Sat Sep  8 20:13:21 2012
@@ -587,7 +587,7 @@ public class TestCompaction extends HBas
     HStore store = (HStore) r.getStore(COLUMN_FAMILY);
 
     List<StoreFile> storeFiles = store.getStorefiles();
-    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, true);
     Compactor tool = new Compactor(this.conf);
 
     StoreFile.Writer compactedFile =

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Sat Sep  8 20:13:21 2012
@@ -150,7 +150,7 @@ public class TestHRegionServerBulkLoad {
               + Bytes.toStringBinary(row));
           byte[] regionName = location.getRegionInfo().getRegionName();
           BulkLoadHFileRequest request =
-            RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName);
+            RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
           server.bulkLoadHFile(null, request);
           return null;
         }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
Sat Sep  8 20:13:21 2012
@@ -608,8 +608,8 @@ public class TestStoreFile extends HBase
     fs.delete(f, true);
   }
 
-  public void testFlushTimeComparator() {
-    assertOrdering(StoreFile.Comparators.FLUSH_TIME,
+  public void testSeqIdComparator() {
+    assertOrdering(StoreFile.Comparators.SEQ_ID,
         mockStoreFile(true, 1000, -1, "/foo/123"),
         mockStoreFile(true, 1000, -1, "/foo/126"),
         mockStoreFile(true, 2000, -1, "/foo/126"),
@@ -640,13 +640,7 @@ public class TestStoreFile extends HBase
     StoreFile mock = Mockito.mock(StoreFile.class);
     Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
     Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
-    if (bulkLoad) {
-      // Bulk load files will throw if you ask for their sequence ID
-      Mockito.doThrow(new IllegalAccessError("bulk load"))
-        .when(mock).getMaxSequenceId();
-    } else {
-      Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
-    }
+    Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
     Mockito.doReturn(new Path(path)).when(mock).getPath();
     String name = "mock storefile, bulkLoad=" + bulkLoad +
       " bulkTimestamp=" + bulkTimestamp +

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1382351&r1=1382350&r2=1382351&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Sat Sep  8 20:13:21 2012
@@ -310,7 +310,7 @@ public class TestWALReplay {
     writer.close();
     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
     hfs.add(Pair.newPair(family, f.toString()));
-    region.bulkLoadHFiles(hfs);
+    region.bulkLoadHFiles(hfs, true);
     // Add an edit so something in the WAL
     region.put((new Put(row)).add(family, family, family));
     wal.sync();



Mime
View raw message