hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject svn commit: r1526696 - in /hbase/trunk: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/java/org/apache/ha...
Date Thu, 26 Sep 2013 21:16:34 GMT
Author: jeffreyz
Date: Thu Sep 26 21:16:33 2013
New Revision: 1526696

URL: http://svn.apache.org/r1526696
Log:
hbase-9390: coprocessors observers are not called during a recovery with the new log replay algorithm - part2

Modified:
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java Thu Sep 26 21:16:33 2013
@@ -19599,12 +19599,12 @@ public final class AdminProtos {
           com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse> done);
 
       /**
-       * <code>rpc Replay(.MultiRequest) returns (.MultiResponse);</code>
+       * <code>rpc Replay(.ReplicateWALEntryRequest) returns (.ReplicateWALEntryResponse);</code>
        */
       public abstract void replay(
           com.google.protobuf.RpcController controller,
-          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request,
-          com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse> done);
+          org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request,
+          com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse> done);
 
       /**
        * <code>rpc RollWALWriter(.RollWALWriterRequest) returns (.RollWALWriterResponse);</code>
@@ -19726,8 +19726,8 @@ public final class AdminProtos {
         @java.lang.Override
         public  void replay(
             com.google.protobuf.RpcController controller,
-            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request,
-            com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse> done) {
+            org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request,
+            com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse> done) {
           impl.replay(controller, request, done);
         }
 
@@ -19806,7 +19806,7 @@ public final class AdminProtos {
             case 9:
               return impl.replicateWALEntry(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest)request);
             case 10:
-              return impl.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request);
+              return impl.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest)request);
             case 11:
               return impl.rollWALWriter(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest)request);
             case 12:
@@ -19850,7 +19850,7 @@ public final class AdminProtos {
             case 9:
               return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance();
             case 10:
-              return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance();
+              return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance();
             case 11:
               return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance();
             case 12:
@@ -19894,7 +19894,7 @@ public final class AdminProtos {
             case 9:
               return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance();
             case 10:
-              return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance();
+              return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance();
             case 11:
               return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance();
             case 12:
@@ -19992,12 +19992,12 @@ public final class AdminProtos {
         com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse> done);
 
     /**
-     * <code>rpc Replay(.MultiRequest) returns (.MultiResponse);</code>
+     * <code>rpc Replay(.ReplicateWALEntryRequest) returns (.ReplicateWALEntryResponse);</code>
      */
     public abstract void replay(
         com.google.protobuf.RpcController controller,
-        org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request,
-        com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse> done);
+        org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request,
+        com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse> done);
 
     /**
      * <code>rpc RollWALWriter(.RollWALWriterRequest) returns (.RollWALWriterResponse);</code>
@@ -20104,8 +20104,8 @@ public final class AdminProtos {
               done));
           return;
         case 10:
-          this.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest)request,
-            com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse>specializeCallback(
+          this.replay(controller, (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest)request,
+            com.google.protobuf.RpcUtil.<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse>specializeCallback(
               done));
           return;
         case 11:
@@ -20163,7 +20163,7 @@ public final class AdminProtos {
         case 9:
           return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance();
         case 10:
-          return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest.getDefaultInstance();
+          return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest.getDefaultInstance();
         case 11:
           return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest.getDefaultInstance();
         case 12:
@@ -20207,7 +20207,7 @@ public final class AdminProtos {
         case 9:
           return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance();
         case 10:
-          return org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance();
+          return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance();
         case 11:
           return org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse.getDefaultInstance();
         case 12:
@@ -20389,17 +20389,17 @@ public final class AdminProtos {
 
       public  void replay(
           com.google.protobuf.RpcController controller,
-          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request,
-          com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse> done) {
+          org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request,
+          com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse> done) {
         channel.callMethod(
           getDescriptor().getMethods().get(10),
           controller,
           request,
-          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance(),
+          org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance(),
           com.google.protobuf.RpcUtil.generalizeCallback(
             done,
-            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.class,
-            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance()));
+            org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.class,
+            org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance()));
       }
 
       public  void rollWALWriter(
@@ -20519,9 +20519,9 @@ public final class AdminProtos {
           org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request)
           throws com.google.protobuf.ServiceException;
 
-      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse replay(
+      public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse replay(
           com.google.protobuf.RpcController controller,
-          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request)
+          org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request)
           throws com.google.protobuf.ServiceException;
 
       public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse rollWALWriter(
@@ -20672,15 +20672,15 @@ public final class AdminProtos {
       }
 
 
-      public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse replay(
+      public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse replay(
           com.google.protobuf.RpcController controller,
-          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest request)
+          org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest request)
           throws com.google.protobuf.ServiceException {
-        return (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse) channel.callBlockingMethod(
+        return (org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse) channel.callBlockingMethod(
           getDescriptor().getMethods().get(10),
           controller,
           request,
-          org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse.getDefaultInstance());
+          org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse.getDefaultInstance());
       }
 
 
@@ -20962,7 +20962,7 @@ public final class AdminProtos {
       "ServerInfoRequest\"B\n\nServerInfo\022 \n\013serve" +
       "r_name\030\001 \002(\0132\013.ServerName\022\022\n\nwebui_port\030" +
       "\002 \001(\r\"9\n\025GetServerInfoResponse\022 \n\013server" +
-      "_info\030\001 \002(\0132\013.ServerInfo2\256\007\n\014AdminServic" +
+      "_info\030\001 \002(\0132\013.ServerInfo2\306\007\n\014AdminServic" +
       "e\022>\n\rGetRegionInfo\022\025.GetRegionInfoReques" +
       "t\032\026.GetRegionInfoResponse\022;\n\014GetStoreFil",
       "e\022\024.GetStoreFileRequest\032\025.GetStoreFileRe" +
@@ -20978,16 +20978,17 @@ public final class AdminProtos {
       "nResponse\022;\n\014MergeRegions\022\024.MergeRegions" +
       "Request\032\025.MergeRegionsResponse\022J\n\021Replic" +
       "ateWALEntry\022\031.ReplicateWALEntryRequest\032\032" +
-      ".ReplicateWALEntryResponse\022\'\n\006Replay\022\r.M" +
-      "ultiRequest\032\016.MultiResponse\022>\n\rRollWALWr" +
-      "iter\022\025.RollWALWriterRequest\032\026.RollWALWri" +
-      "terResponse\022>\n\rGetServerInfo\022\025.GetServer" +
-      "InfoRequest\032\026.GetServerInfoResponse\0225\n\nS" +
-      "topServer\022\022.StopServerRequest\032\023.StopServ" +
-      "erResponse\022M\n\022UpdateFavoredNodes\022\032.Updat",
-      "eFavoredNodesRequest\032\033.UpdateFavoredNode" +
-      "sResponseBA\n*org.apache.hadoop.hbase.pro" +
-      "tobuf.generatedB\013AdminProtosH\001\210\001\001\240\001\001"
+      ".ReplicateWALEntryResponse\022?\n\006Replay\022\031.R" +
+      "eplicateWALEntryRequest\032\032.ReplicateWALEn" +
+      "tryResponse\022>\n\rRollWALWriter\022\025.RollWALWr" +
+      "iterRequest\032\026.RollWALWriterResponse\022>\n\rG" +
+      "etServerInfo\022\025.GetServerInfoRequest\032\026.Ge" +
+      "tServerInfoResponse\0225\n\nStopServer\022\022.Stop" +
+      "ServerRequest\032\023.StopServerResponse\022M\n\022Up",
+      "dateFavoredNodes\022\032.UpdateFavoredNodesReq" +
+      "uest\032\033.UpdateFavoredNodesResponseBA\n*org" +
+      ".apache.hadoop.hbase.protobuf.generatedB" +
+      "\013AdminProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto Thu Sep 26 21:16:33 2013
@@ -253,8 +253,8 @@ service AdminService {
   rpc ReplicateWALEntry(ReplicateWALEntryRequest)
     returns(ReplicateWALEntryResponse);
 
-  rpc Replay(MultiRequest)
-    returns(MultiResponse);
+  rpc Replay(ReplicateWALEntryRequest)
+    returns(ReplicateWALEntryResponse);
 
   rpc RollWALWriter(RollWALWriterRequest)
     returns(RollWALWriterResponse);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Sep 26 21:16:33 2013
@@ -66,6 +66,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
@@ -112,10 +113,15 @@ import org.apache.hadoop.hbase.ipc.RpcSe
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -2209,8 +2215,7 @@ public class HRegion implements HeapSize
       Mutation mutation = batchOp.operations[firstIndex];
       if (walEdit.size() > 0) {
         txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
-              walEdit, mutation.getClusterIds(), now, this.htableDescriptor,
-              this.getCoprocessorHost());
+              walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
       }
 
       // -------------------------------
@@ -4487,8 +4492,7 @@ public class HRegion implements HeapSize
           // 7. Append no sync
           if (!walEdit.isEmpty()) {
             txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
-                  walEdit, processor.getClusterIds(), now, this.htableDescriptor,
-                  this.getCoprocessorHost());
+                  walEdit, processor.getClusterIds(), now, this.htableDescriptor);
           }
           // 8. Release region lock
           if (locked) {
@@ -4716,7 +4720,7 @@ public class HRegion implements HeapSize
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
               walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
-              this.htableDescriptor, this.getCoprocessorHost());
+                  this.htableDescriptor);
           } else {
             recordMutationWithoutWal(append.getFamilyCellMap());
           }
@@ -4865,7 +4869,7 @@ public class HRegion implements HeapSize
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
                 walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
-                this.htableDescriptor, this.getCoprocessorHost());
+                  this.htableDescriptor);
           } else {
             recordMutationWithoutWal(increment.getFamilyCellMap());
           }
@@ -5606,4 +5610,83 @@ public class HRegion implements HeapSize
       }
     }
   }
+
+  /**
+   * This function is used to construct replay mutations from WALEdits
+   * @param entries
+   * @param cells
+   * @param clusterId
+   * @param logEntries List of Pair<HLogKey, WALEdit> contructed from its PB version - WALEntry
+   *          instances
+   * @return list of Pair<MutationType, Mutation> to be replayed
+   * @throws IOException
+   */
+  List<Pair<MutationType, Mutation>> getReplayMutations(List<WALEntry> entries,
+      CellScanner cells, UUID clusterId, List<Pair<HLogKey, WALEdit>> logEntries)
+      throws IOException {
+
+    List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType, Mutation>>();
+    List<Pair<MutationType, Mutation>> tmpEditMutations =
+        new ArrayList<Pair<MutationType, Mutation>>();
+
+    for (WALEntry entry : entries) {
+      HLogKey logKey = null;
+      WALEdit val = null;
+      Cell previousCell = null;
+      Mutation m = null;
+      tmpEditMutations.clear();
+
+      int count = entry.getAssociatedCellCount();
+      if (coprocessorHost != null) {
+        val = new WALEdit();
+      }
+
+      for (int i = 0; i < count; i++) {
+        // Throw index out of bounds if our cell count is off
+        if (!cells.advance()) {
+          throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+        }
+        Cell cell = cells.current();
+        if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
+
+        boolean isNewRowOrType =
+            previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
+                || !CellUtil.matchingRow(previousCell, cell);
+        if (isNewRowOrType) {
+          // Create new mutation
+          if (CellUtil.isDelete(cell)) {
+            m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+            tmpEditMutations.add(new Pair<MutationType, Mutation>(MutationType.DELETE, m));
+          } else {
+            m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+            tmpEditMutations.add(new Pair<MutationType, Mutation>(MutationType.PUT, m));
+          }
+        }
+        if (CellUtil.isDelete(cell)) {
+          ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
+        } else {
+          ((Put) m).add(KeyValueUtil.ensureKeyValue(cell));
+        }
+        previousCell = cell;
+      }
+
+      // Start coprocessor replay here. The coprocessor is for each WALEdit
+      // instead of a KeyValue.
+      if (coprocessorHost != null) {
+        WALKey walKey = entry.getKey();
+        logKey =
+            new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
+                .getTableName().toByteArray()), walKey.getLogSequenceNumber(),
+                walKey.getWriteTime(), clusterId);
+        if (coprocessorHost.preWALRestore(this.getRegionInfo(), logKey, val)) {
+          // if bypass this log entry, ignore it ...
+          continue;
+        }
+        logEntries.add(new Pair<HLogKey, WALEdit>(logKey, val));
+      }
+      mutations.addAll(tmpEditMutations);
+    }
+
+    return mutations;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Sep 26 21:16:33 2013
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
@@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.client.In
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -118,6 +120,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -135,6 +138,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
@@ -181,6 +185,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
@@ -191,8 +196,10 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -3861,45 +3868,45 @@ public class HRegionServer implements Cl
    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
    * that the given mutations will be durable on the receiving RS if this method returns without any
    * exception.
-   * @param rpcc the RPC controller
+   * @param controller the RPC controller
    * @param request the request
    * @throws ServiceException
    */
   @Override
   @QosPriority(priority = HConstants.REPLAY_QOS)
-  public MultiResponse replay(final RpcController rpcc, final MultiRequest request)
-      throws ServiceException {
+  public ReplicateWALEntryResponse replay(final RpcController controller,
+      final ReplicateWALEntryRequest request) throws ServiceException {
     long before = EnvironmentEdgeManager.currentTimeMillis();
-    PayloadCarryingRpcController controller = (PayloadCarryingRpcController) rpcc;
-    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
-    // Clear scanner so we are not holding on to reference across call.
-    if (controller != null) controller.setCellScanner(null);
+    CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
     try {
       checkOpen();
-      HRegion region = getRegion(request.getRegion());
-      MultiResponse.Builder builder = MultiResponse.newBuilder();
-      List<MutationProto> mutates = new ArrayList<MutationProto>();
-      for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
-        if (actionUnion.hasMutation()) {
-          MutationProto mutate = actionUnion.getMutation();
-          MutationType type = mutate.getMutateType();
-          switch (type) {
-          case PUT:
-          case DELETE:
-            mutates.add(mutate);
-            break;
-          default:
-            throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
+      List<WALEntry> entries = request.getEntryList();
+      if(entries == null || entries.isEmpty()) {
+        // empty input
+        return ReplicateWALEntryResponse.newBuilder().build();
+      }
+      
+      HRegion region = this.getRegionByEncodedName(
+        entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
+      List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
+      List<Pair<MutationType, Mutation>> mutations = region.getReplayMutations(
+        request.getEntryList(), cells, UUID.fromString(this.clusterId), walEntries);
+      if (!mutations.isEmpty()) {
+        OperationStatus[] result = doBatchOp(region, mutations, true);
+        // check if it's a partial success
+        for (int i = 0; result != null && i < result.length; i++) {
+          if (result[i] != OperationStatus.SUCCESS) {
+            throw new IOException(result[i].getExceptionMsg());
           }
-        } else {
-          LOG.warn("Error: invalid action: " + actionUnion + ". " + "it must be a Mutation.");
-          throw new DoNotRetryIOException("Invalid action, " + "it must be a Mutation.");
         }
       }
-      if (!mutates.isEmpty()) {
-        doBatchOp(builder, region, mutates, cellScanner, true);
+      if (region.getCoprocessorHost() != null) {
+        for (Pair<HLogKey, WALEdit> wal : walEntries) {
+          region.getCoprocessorHost().postWALRestore(region.getRegionInfo(), wal.getFirst(),
+            wal.getSecond());
+        }
       }
-      return builder.build();
+      return ReplicateWALEntryResponse.newBuilder().build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
     } finally {
@@ -4036,21 +4043,13 @@ public class HRegionServer implements Cl
 
   /**
    * Execute a list of Put/Delete mutations.
-   */
-  protected void doBatchOp(final MultiResponse.Builder builder,
-      final HRegion region, final List<MutationProto> mutates, final CellScanner cells) {
-    doBatchOp(builder, region, mutates, cells, false);
-  }
-
-  /**
-   * Execute a list of Put/Delete mutations.
    *
    * @param builder
    * @param region
    * @param mutations
    */
   protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
-      final List<MutationProto> mutations, final CellScanner cells, boolean isReplay) {
+      final List<MutationProto> mutations, final CellScanner cells) {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTimeMillis();
     boolean batchContainsPuts = false, batchContainsDelete = false;
@@ -4077,7 +4076,7 @@ public class HRegionServer implements Cl
         cacheFlusher.reclaimMemStoreMemory();
       }
 
-      OperationStatus codes[] = region.batchMutate(mArray, isReplay);
+      OperationStatus codes[] = region.batchMutate(mArray, false);
       for (i = 0; i < codes.length; i++) {
         switch (codes[i].getOperationStatusCode()) {
           case BAD_FAMILY:
@@ -4101,21 +4100,11 @@ public class HRegionServer implements Cl
           case SUCCESS:
             break;
         }
-        if (isReplay && codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) {
-          // in replay mode, we only need to catpure the first error because we will retry the whole
-          // batch when an error happens
-          break;
-        }
       }
     } catch (IOException ie) {
       ActionResult result = ResponseConverter.buildActionResult(ie);
       for (int i = 0; i < mutations.size(); i++) {
         builder.setResult(i, result);
-        if (isReplay) {
-          // in replay mode, we only need to catpure the first error because we will retry the whole
-          // batch when an error happens
-          break;
-        }
       }
     }
     long after = EnvironmentEdgeManager.currentTimeMillis();
@@ -4128,6 +4117,46 @@ public class HRegionServer implements Cl
   }
 
   /**
+   * Execute a list of Put/Delete mutations.
+   * @param region
+   * @param mutations
+   * @param isReplay
+   * @return an array of OperationStatus which internally contains the
+   *         OperationStatusCode and the exceptionMessage if any
+   * @throws IOException
+   */
+  protected OperationStatus[] doBatchOp(final HRegion region,
+      final List<Pair<MutationType, Mutation>> mutations, boolean isReplay) throws IOException {
+    Mutation[] mArray = new Mutation[mutations.size()];
+    long before = EnvironmentEdgeManager.currentTimeMillis();
+    boolean batchContainsPuts = false, batchContainsDelete = false;
+    try {
+      int i = 0;
+      for (Pair<MutationType, Mutation> m : mutations) {
+        if (m.getFirst() == MutationType.PUT) {
+          batchContainsPuts = true;
+        } else {
+          batchContainsDelete = true;
+        }
+        mArray[i++] = m.getSecond();
+      }
+      requestCount.add(mutations.size());
+      if (!region.getRegionInfo().isMetaTable()) {
+        cacheFlusher.reclaimMemStoreMemory();
+      }
+      return region.batchMutate(mArray, isReplay);
+    } finally {
+      long after = EnvironmentEdgeManager.currentTimeMillis();
+      if (batchContainsPuts) {
+        metricsRegionServer.updatePut(after - before);
+      }
+      if (batchContainsDelete) {
+        metricsRegionServer.updateDelete(after - before);
+      }
+    }
+  }
+
+  /**
    * Mutate a list of rows atomically.
    *
    * @param region

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Thu Sep 26 21:16:33 2013
@@ -841,7 +841,7 @@ class FSHLog implements HLog, Syncable {
   @Override
   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
     final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
-    append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore, null);
+    append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore);
   }
 
   /**
@@ -872,8 +872,7 @@ class FSHLog implements HLog, Syncable {
    */
   @SuppressWarnings("deprecation")
   private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
-      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore,
-      RegionCoprocessorHost regionCoproHost)
+      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
     throws IOException {
       if (edits.isEmpty()) return this.unflushedEntries.get();
       if (this.closed) {
@@ -894,7 +893,7 @@ class FSHLog implements HLog, Syncable {
           byte [] encodedRegionName = info.getEncodedNameAsBytes();
           if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
           HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds);
-          doWrite(info, logKey, edits, htd, regionCoproHost);
+          doWrite(info, logKey, edits, htd);
           this.numEntries.incrementAndGet();
           txid = this.unflushedEntries.incrementAndGet();
           if (htd.isDeferredLogFlush()) {
@@ -917,10 +916,9 @@ class FSHLog implements HLog, Syncable {
 
   @Override
   public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-      List<UUID> clusterIds, final long now, HTableDescriptor htd,
-      RegionCoprocessorHost regionCoproHost)
+      List<UUID> clusterIds, final long now, HTableDescriptor htd)
     throws IOException {
-    return append(info, tableName, edits, clusterIds, now, htd, false, true, regionCoproHost);
+    return append(info, tableName, edits, clusterIds, now, htd, false, true);
   }
 
   /**
@@ -1205,7 +1203,7 @@ class FSHLog implements HLog, Syncable {
 
   // TODO: Remove info.  Unused.
   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
-                           HTableDescriptor htd, RegionCoprocessorHost regionCoproHost)
+ HTableDescriptor htd)
   throws IOException {
     if (!this.enabled) {
       return;
@@ -1222,18 +1220,12 @@ class FSHLog implements HLog, Syncable {
         if (logEdit.isReplay()) {
           // set replication scope null so that this won't be replicated
           logKey.setScopes(null);
-          if(regionCoproHost != null) {
-            regionCoproHost.preWALRestore(info, logKey, logEdit);
-          }
         }
         // write to our buffer for the Hlog file.
         logSyncer.append(new FSHLog.Entry(logKey, logEdit));
       }
       long took = EnvironmentEdgeManager.currentTimeMillis() - now;
       coprocessorHost.postWALWrite(info, logKey, logEdit);
-      if(logEdit.isReplay() && regionCoproHost != null ) {
-        regionCoproHost.postWALRestore(info, logKey, logEdit);
-      }
       long len = 0;
       for (KeyValue kv : logEdit.getKeyValues()) {
         len += kv.getLength();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu Sep 26 21:16:33 2013
@@ -301,8 +301,7 @@ public interface HLog {
    * @throws IOException
    */
   public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-        List<UUID> clusterIds, final long now, HTableDescriptor htd,
-        RegionCoprocessorHost regionCoproHost) throws IOException;
+      List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
 
   // TODO: Do we need all these versions of sync?
   void hsync() throws IOException;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Sep 26 21:16:33 2013
@@ -166,7 +166,9 @@ public class HLogSplitter {
         conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
             128*1024*1024));
 
-    this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 512);
+    // a larger minBatchSize may slow down recovery because replay writer has to wait for
+    // enough edits before replaying them
+    this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
     this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
 
@@ -1316,8 +1318,8 @@ public class HLogSplitter {
      * Map key -> value layout
      * <servername>:<table name> -> Queue<Row>
      */
-    private Map<String, List<Pair<HRegionLocation, Row>>> serverToBufferQueueMap =
-        new ConcurrentHashMap<String, List<Pair<HRegionLocation, Row>>>();
+    private Map<String, List<Pair<HRegionLocation, HLog.Entry>>> serverToBufferQueueMap =
+        new ConcurrentHashMap<String, List<Pair<HRegionLocation, HLog.Entry>>>();
     private List<Throwable> thrown = new ArrayList<Throwable>();
 
     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
@@ -1358,10 +1360,10 @@ public class HLogSplitter {
       // process workitems
       String maxLocKey = null;
       int maxSize = 0;
-      List<Pair<HRegionLocation, Row>> maxQueue = null;
+      List<Pair<HRegionLocation, HLog.Entry>> maxQueue = null;
       synchronized (this.serverToBufferQueueMap) {
         for (String key : this.serverToBufferQueueMap.keySet()) {
-          List<Pair<HRegionLocation, Row>> curQueue = this.serverToBufferQueueMap.get(key);
+          List<Pair<HRegionLocation, HLog.Entry>> curQueue = this.serverToBufferQueueMap.get(key);
           if (curQueue.size() > maxSize) {
             maxSize = curQueue.size();
             maxQueue = curQueue;
@@ -1398,6 +1400,8 @@ public class HLogSplitter {
       for (HLog.Entry entry : entries) {
         WALEdit edit = entry.getEdit();
         TableName table = entry.getKey().getTablename();
+        // clear scopes which isn't needed for recovery
+        entry.getKey().setScopes(null);
         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
         // skip edits of non-existent tables
         if (nonExistentTables != null && nonExistentTables.contains(table)) {
@@ -1407,15 +1411,10 @@ public class HLogSplitter {
 
         Map<byte[], Long> maxStoreSequenceIds = null;
         boolean needSkip = false;
-        Put put = null;
-        Delete del = null;
-        KeyValue lastKV = null;
         HRegionLocation loc = null;
-        Row preRow = null;
-        HRegionLocation preLoc = null;
-        Row lastAddedRow = null; // it is not really needed here just be conservative
-        String preKey = null;
+        String locKey = null;
         List<KeyValue> kvs = edit.getKeyValues();
+        List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
         HConnection hconn = this.getConnectionByTableName(table);
 
         for (KeyValue kv : kvs) {
@@ -1423,98 +1422,71 @@ public class HLogSplitter {
           // We don't handle HBASE-2231 because we may or may not replay a compaction event.
           // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
           // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
-          if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
+          if (kv.matchingFamily(WALEdit.METAFAMILY)) {
+            skippedKVs.add(kv);
+            continue;
+          }
 
-          if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
-            if (preRow != null) {
-              synchronized (serverToBufferQueueMap) {
-                List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
-                if (queue == null) {
-                  queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
-                  serverToBufferQueueMap.put(preKey, queue);
-                }
-                queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
-                lastAddedRow = preRow;
-              }
-              // store regions we have recovered so far
-              addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
+          try {
+            loc =
+                locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
+                  encodeRegionNameStr);
+          } catch (TableNotFoundException ex) {
+            // table has been deleted so skip edits of the table
+            LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
+                + encodeRegionNameStr);
+            lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
+            if (nonExistentTables == null) {
+              nonExistentTables = new TreeSet<TableName>();
             }
+            nonExistentTables.add(table);
+            this.skippedEdits.incrementAndGet();
+            needSkip = true;
+            break;
+          }
 
-            try {
-              loc = locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
-                encodeRegionNameStr);
-            } catch (TableNotFoundException ex) {
-              // table has been deleted so skip edits of the table
-              LOG.info("Table " + table
-                  + " doesn't exist. Skip log replay for region " + encodeRegionNameStr);
-              lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
-              if (nonExistentTables == null) {
-                nonExistentTables = new TreeSet<TableName>();
-              }
-              nonExistentTables.add(table);
-              this.skippedEdits.incrementAndGet();
-              needSkip = true;
-              break;
+          cachedLastFlushedSequenceId =
+              lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
+          if (cachedLastFlushedSequenceId != null
+              && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+            // skip the whole HLog entry
+            this.skippedEdits.incrementAndGet();
+            needSkip = true;
+            break;
+          } else {
+            if (maxStoreSequenceIds == null) {
+              maxStoreSequenceIds =
+                  regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
             }
-
-            cachedLastFlushedSequenceId =
-                lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
-            if (cachedLastFlushedSequenceId != null
-                && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
-              // skip the whole HLog entry
-              this.skippedEdits.incrementAndGet();
-              needSkip = true;
-              break;
-            } else {
-              if (maxStoreSequenceIds == null) {
-                maxStoreSequenceIds =
-                    regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
-              }
-              if (maxStoreSequenceIds != null) {
-                Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
-                if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
-                  // skip current kv if column family doesn't exist anymore or already flushed
-                  continue;
-                }
+            if (maxStoreSequenceIds != null) {
+              Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
+              if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
+                // skip current kv if column family doesn't exist anymore or already flushed
+                skippedKVs.add(kv);
+                continue;
               }
             }
-
-            if (kv.isDelete()) {
-              del = new Delete(kv.getRow());
-              del.setClusterIds(entry.getKey().getClusterIds());
-              preRow = del;
-            } else {
-              put = new Put(kv.getRow());
-              put.setClusterIds(entry.getKey().getClusterIds());
-              preRow = put;
-            }
-            preKey = loc.getHostnamePort() + KEY_DELIMITER + table;
-            preLoc = loc;
           }
-          if (kv.isDelete()) {
-            del.addDeleteMarker(kv);
-          } else {
-            put.add(kv);
-          }
-          lastKV = kv;
         }
 
         // skip the edit
-        if(needSkip) continue;
+        if (needSkip) continue;
 
-        // add the last row
-        if (preRow != null && lastAddedRow != preRow) {
-          synchronized (serverToBufferQueueMap) {
-            List<Pair<HRegionLocation, Row>> queue = serverToBufferQueueMap.get(preKey);
-            if (queue == null) {
-              queue = Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Row>>());
-              serverToBufferQueueMap.put(preKey, queue);
-            }
-            queue.add(new Pair<HRegionLocation, Row>(preLoc, preRow));
+        if (!skippedKVs.isEmpty()) {
+          kvs.removeAll(skippedKVs);
+        }
+        synchronized (serverToBufferQueueMap) {
+          locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
+          List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
+          if (queue == null) {
+            queue =
+                Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, HLog.Entry>>());
+            serverToBufferQueueMap.put(locKey, queue);
           }
-          // store regions we have recovered so far
-          addToRecoveredRegions(preLoc.getRegionInfo().getEncodedName());
+          queue.add(new Pair<HRegionLocation, HLog.Entry>(loc, entry));
         }
+        // store regions we have recovered so far
+        addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
       }
     }
 
@@ -1580,7 +1552,7 @@ public class HLogSplitter {
       return loc;
     }
 
-    private void processWorkItems(String key, List<Pair<HRegionLocation, Row>> actions)
+    private void processWorkItems(String key, List<Pair<HRegionLocation, HLog.Entry>> actions)
         throws IOException {
       RegionServerWriter rsw = null;
 
@@ -1663,7 +1635,7 @@ public class HLogSplitter {
     protected boolean flush() throws IOException {
       String curLoc = null;
       int curSize = 0;
-      List<Pair<HRegionLocation, Row>> curQueue = null;
+      List<Pair<HRegionLocation, HLog.Entry>> curQueue = null;
       synchronized (this.serverToBufferQueueMap) {
         for (String locationKey : this.serverToBufferQueueMap.keySet()) {
           curQueue = this.serverToBufferQueueMap.get(locationKey);
@@ -1792,8 +1764,8 @@ public class HLogSplitter {
       }
 
       TableName tableName = getTableFromLocationStr(loc);
-      if(tableName != null){
-        LOG.warn("Invalid location string:" + loc + " found.");
+      if(tableName == null){
+        throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
       }
 
       HConnection hconn = getConnectionByTableName(tableName);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java Thu Sep 26 21:16:33 2013
@@ -23,12 +23,16 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -40,8 +44,14 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
@@ -50,6 +60,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
 
 /**
@@ -62,7 +73,7 @@ import com.google.protobuf.ServiceExcept
 public class WALEditsReplaySink {
 
   private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
-  private static final int MAX_BATCH_SIZE = 3000;
+  private static final int MAX_BATCH_SIZE = 1024;
 
   private final Configuration conf;
   private final HConnection conn;
@@ -93,41 +104,38 @@ public class WALEditsReplaySink {
 
   /**
    * Replay an array of actions of the same region directly into the newly assigned Region Server
-   * @param actions
+   * @param entries
    * @throws IOException
    */
-  public void replayEntries(List<Pair<HRegionLocation, Row>> actions) throws IOException {
-    if (actions.size() == 0) {
+  public void replayEntries(List<Pair<HRegionLocation, HLog.Entry>> entries) throws IOException {
+    if (entries.size() == 0) {
       return;
     }
 
-    int batchSize = actions.size();
-    int dataSize = 0;
-    Map<HRegionInfo, List<Action<Row>>> actionsByRegion = 
-        new HashMap<HRegionInfo, List<Action<Row>>>();
+    int batchSize = entries.size();
+    Map<HRegionInfo, List<HLog.Entry>> entriesByRegion =
+        new HashMap<HRegionInfo, List<HLog.Entry>>();
     HRegionLocation loc = null;
-    Row row = null;
-    List<Action<Row>> regionActions = null;
+    HLog.Entry entry = null;
+    List<HLog.Entry> regionEntries = null;
     // Build the action list. 
     for (int i = 0; i < batchSize; i++) {
-      loc = actions.get(i).getFirst();
-      row = actions.get(i).getSecond();
-      if (actionsByRegion.containsKey(loc.getRegionInfo())) {
-        regionActions = actionsByRegion.get(loc.getRegionInfo());
+      loc = entries.get(i).getFirst();
+      entry = entries.get(i).getSecond();
+      if (entriesByRegion.containsKey(loc.getRegionInfo())) {
+        regionEntries = entriesByRegion.get(loc.getRegionInfo());
       } else {
-        regionActions = new ArrayList<Action<Row>>();
-        actionsByRegion.put(loc.getRegionInfo(), regionActions);
+        regionEntries = new ArrayList<HLog.Entry>();
+        entriesByRegion.put(loc.getRegionInfo(), regionEntries);
       }
-      Action<Row> action = new Action<Row>(row, i);
-      regionActions.add(action);
-      dataSize += row.getRow().length;
+      regionEntries.add(entry);
     }
     
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
     // replaying edits by region
-    for (HRegionInfo curRegion : actionsByRegion.keySet()) {
-      List<Action<Row>> allActions = actionsByRegion.get(curRegion);
+    for (HRegionInfo curRegion : entriesByRegion.keySet()) {
+      List<HLog.Entry> allActions = entriesByRegion.get(curRegion);
       // send edits in chunks
       int totalActions = allActions.size();
       int replayedActions = 0;
@@ -142,12 +150,11 @@ public class WALEditsReplaySink {
     }
 
     long endTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
-    LOG.debug("number of rows:" + actions.size() + " are sent by batch! spent " + endTime
+    LOG.debug("number of rows:" + entries.size() + " are sent by batch! spent " + endTime
         + "(ms)!");
 
     metrics.updateReplayTime(endTime);
     metrics.updateReplayBatchSize(batchSize);
-    metrics.updateReplayDataSize(dataSize);
 
     this.totalReplayedEdits.addAndGet(batchSize);
   }
@@ -162,12 +169,13 @@ public class WALEditsReplaySink {
   }
 
   private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
-      final List<Action<Row>> actions) throws IOException {
+      final List<HLog.Entry> entries) throws IOException {
     try {
       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
-      ReplayServerCallable<MultiResponse> callable = new ReplayServerCallable<MultiResponse>(
-          this.conn, this.tableName, regionLoc, regionInfo, actions);
-      factory.<MultiResponse> newCaller().callWithRetries(callable, this.replayTimeout);
+      ReplayServerCallable<ReplicateWALEntryResponse> callable =
+          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
+              regionInfo, entries);
+      factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
     } catch (IOException ie) {
       if (skipErrors) {
         LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
@@ -182,52 +190,44 @@ public class WALEditsReplaySink {
    * Callable that handles the <code>replay</code> method call going against a single regionserver
    * @param <R>
    */
-  class ReplayServerCallable<R> extends RegionServerCallable<MultiResponse> {
+  class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
     private HRegionInfo regionInfo;
-    private List<Action<Row>> actions;
+    private List<HLog.Entry> entries;
 
     ReplayServerCallable(final HConnection connection, final TableName tableName,
         final HRegionLocation regionLoc, final HRegionInfo regionInfo,
-        final List<Action<Row>> actions) {
+        final List<HLog.Entry> entries) {
       super(connection, tableName, null);
-      this.actions = actions;
+      this.entries = entries;
       this.regionInfo = regionInfo;
       setLocation(regionLoc);
     }
     
     @Override
-    public MultiResponse call() throws IOException {
+    public ReplicateWALEntryResponse call() throws IOException {
       try {
-        replayToServer(this.regionInfo, this.actions);
+        replayToServer(this.regionInfo, this.entries);
       } catch (ServiceException se) {
         throw ProtobufUtil.getRemoteException(se);
       }
       return null;
     }
 
-    private void replayToServer(HRegionInfo regionInfo, List<Action<Row>> actions)
+    private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries)
         throws IOException, ServiceException {
+      if (entries.isEmpty()) return;
+
+      HLog.Entry[] entriesArray = new HLog.Entry[entries.size()];
+      entriesArray = entries.toArray(entriesArray);
       AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
-      MultiRequest request = RequestConverter.buildMultiRequest(regionInfo.getRegionName(),
-        actions);
-      MultiResponse protoResults = remoteSvr.replay(null, request);
-      // check if it's a partial success
-      List<ActionResult> resultList = protoResults.getResultList();
-      for (int i = 0, n = resultList.size(); i < n; i++) {
-        ActionResult result = resultList.get(i);
-        if (result.hasException()) {
-          Throwable t = ProtobufUtil.toException(result.getException());
-          if (!skipErrors) {
-            IOException ie = new IOException();
-            ie.initCause(t);
-            // retry
-            throw ie;
-          } else {
-            LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
-                + "=true so continuing replayToServer with error:" + t.getMessage());
-            return;
-          }
-        }
+
+      Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
+          ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
+      try {
+        PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
+        remoteSvr.replay(controller, p.getFirst());
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
       }
     }
 
@@ -237,10 +237,20 @@ public class WALEditsReplaySink {
       // relocate regions in case we have a new dead server or network hiccup
       // if not due to connection issue, the following code should run fast because it uses
       // cached location
-      for (Action<Row> action : actions) {
-        // use first row to relocate region because all actions are for one region
-        setLocation(conn.locateRegion(tableName, action.getAction().getRow()));
-        break;
+      boolean skip = false;
+      for (HLog.Entry entry : this.entries) {
+        WALEdit edit = entry.getEdit();
+        List<KeyValue> kvs = edit.getKeyValues();
+        for (KeyValue kv : kvs) {
+          // filtering HLog meta entries
+          if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
+
+          setLocation(conn.locateRegion(tableName, kv.getRow()));
+          skip = true;
+          break;
+        }
+        // use first log entry to relocate region because all entries are for one region
+        if (skip) break;
       }
     }
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java Thu Sep 26 21:16:33 2013
@@ -119,8 +119,8 @@ public class SimpleRegionObserver extend
   final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0);
   final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
   final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
-
   final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
+  static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
 
   public void setThrowOnPostFlush(Boolean val){
     throwOnPostFlush.set(val);
@@ -543,6 +543,12 @@ public class SimpleRegionObserver extend
   @Override
   public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
                             HLogKey logKey, WALEdit logEdit) throws IOException {
+    String tableName = logKey.getTablename().getNameAsString();
+    if (tableName.equals(TABLE_SKIPPED)) {
+      // skip recovery of TABLE_SKIPPED for testing purpose
+      env.bypass();
+      return;
+    }
     ctPreWALRestore.incrementAndGet();
   }
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Thu Sep 26 21:16:33 2013
@@ -543,6 +543,39 @@ public class TestRegionObserverInterface
     table.close();
   }
 
+  @Test
+  public void testPreWALRestoreSkip() throws Exception {
+    LOG.info(TestRegionObserverInterface.class.getName() + ".testPreWALRestoreSkip");
+    TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
+    HTable table = util.createTable(tableName, new byte[][] { A, B, C });
+
+    JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
+    ServerName sn2 = rs1.getRegionServer().getServerName();
+    String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
+
+    util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
+    while (!sn2.equals(table.getRegionLocations().firstEntry().getValue())) {
+      Thread.sleep(100);
+    }
+
+    Put put = new Put(ROW);
+    put.add(A, A, A);
+    put.add(B, B, B);
+    put.add(C, C, C);
+    table.put(put);
+    table.flushCommits();
+
+    cluster.killRegionServer(rs1.getRegionServer().getServerName());
+    Threads.sleep(20000); // just to be sure that the kill has fully started.
+    util.waitUntilAllRegionsAssigned(tableName);
+
+    verifyMethodResult(SimpleRegionObserver.class, new String[] { "getCtPreWALRestore",
+        "getCtPostWALRestore" }, tableName, new Integer[] { 0, 0 });
+
+    util.deleteTable(tableName);
+    table.close();
+  }
+
   // check each region whether the coprocessor upcalls are called or not.
   private void verifyMethodResult(Class<?> c, String methodName[], TableName tableName,
                                   Object value[]) throws IOException {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Thu Sep 26 21:16:33 2013
@@ -547,7 +547,8 @@ ClientProtos.ClientService.BlockingInter
   }
 
   @Override
-  public MultiResponse replay(RpcController controller, MultiRequest request)
+  public ReplicateWALEntryResponse
+      replay(RpcController controller, ReplicateWALEntryRequest request)
       throws ServiceException {
     // TODO Auto-generated method stub
     return null;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Thu Sep 26 21:16:33 2013
@@ -261,7 +261,6 @@ public class TestDistributedLogSplitting
     // they will consume recovered.edits
     master.balanceSwitch(false);
 
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
 
@@ -286,7 +285,6 @@ public class TestDistributedLogSplitting
     // they will consume recovered.edits
     master.balanceSwitch(false);
 
-    List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Thu Sep 26 21:16:33 2013
@@ -3835,7 +3835,7 @@ public class TestHRegion extends HBaseTe
     //verify append called or not
     verify(log, expectAppend ? times(1) : never())
       .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List<UUID>)any(), 
-        anyLong(), (HTableDescriptor)any(), (RegionCoprocessorHost)any());
+        anyLong(), (HTableDescriptor)any());
 
     //verify sync called or not
     if (expectSync || expectSyncFromLogSyncer) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1526696&r1=1526695&r2=1526696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java Thu Sep 26 21:16:33 2013
@@ -105,8 +105,7 @@ public final class HLogPerformanceEvalua
           addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
           HRegionInfo hri = region.getRegionInfo();
           if (this.noSync) {
-            hlog.appendNoSync(hri, hri.getTable(), walEdit,
-                              new ArrayList<UUID>(), now, htd, null);
+            hlog.appendNoSync(hri, hri.getTable(), walEdit, new ArrayList<UUID>(), now, htd);
           } else {
             hlog.append(hri, hri.getTable(), walEdit, now, htd);
           }
@@ -200,7 +199,7 @@ public final class HLogPerformanceEvalua
             LOG.info("Rolling after " + appends + " edits");
             rollWriter();
           }
-          super.doWrite(info, logKey, logEdit, htd, null);
+          super.doWrite(info, logKey, logEdit, htd);
         };
       };
       hlog.rollWriter();



Mime
View raw message