hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject svn commit: r1527445 - in /hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver: HRegion.java HRegionServer.java wal/HLogSplitter.java
Date Mon, 30 Sep 2013 05:28:24 GMT
Author: jeffreyz
Date: Mon Sep 30 05:28:24 2013
New Revision: 1527445

URL: http://svn.apache.org/r1527445
Log:
hbase-9390: coprocessors observers are not called during a recovery with the new log replay
algorithm - review addendum

Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1527445&r1=1527444&r2=1527445&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Mon Sep 30 05:28:24 2013
@@ -5635,83 +5635,4 @@ public class HRegion implements HeapSize
       }
     }
   }
-
-  /**
-   * This function is used to construct replay mutations from WALEdits
-   * @param entries
-   * @param cells
-   * @param clusterId
-   * @param logEntries List of Pair<HLogKey, WALEdit> contructed from its PB version
- WALEntry
-   *          instances
-   * @return list of Pair<MutationType, Mutation> to be replayed
-   * @throws IOException
-   */
-  List<Pair<MutationType, Mutation>> getReplayMutations(List<WALEntry>
entries,
-      CellScanner cells, UUID clusterId, List<Pair<HLogKey, WALEdit>> logEntries)
-      throws IOException {
-
-    List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType,
Mutation>>();
-    List<Pair<MutationType, Mutation>> tmpEditMutations =
-        new ArrayList<Pair<MutationType, Mutation>>();
-
-    for (WALEntry entry : entries) {
-      HLogKey logKey = null;
-      WALEdit val = null;
-      Cell previousCell = null;
-      Mutation m = null;
-      tmpEditMutations.clear();
-
-      int count = entry.getAssociatedCellCount();
-      if (coprocessorHost != null) {
-        val = new WALEdit();
-      }
-
-      for (int i = 0; i < count; i++) {
-        // Throw index out of bounds if our cell count is off
-        if (!cells.advance()) {
-          throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
-        }
-        Cell cell = cells.current();
-        if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
-
-        boolean isNewRowOrType =
-            previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
-                || !CellUtil.matchingRow(previousCell, cell);
-        if (isNewRowOrType) {
-          // Create new mutation
-          if (CellUtil.isDelete(cell)) {
-            m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-            tmpEditMutations.add(new Pair<MutationType, Mutation>(MutationType.DELETE,
m));
-          } else {
-            m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-            tmpEditMutations.add(new Pair<MutationType, Mutation>(MutationType.PUT,
m));
-          }
-        }
-        if (CellUtil.isDelete(cell)) {
-          ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
-        } else {
-          ((Put) m).add(KeyValueUtil.ensureKeyValue(cell));
-        }
-        previousCell = cell;
-      }
-
-      // Start coprocessor replay here. The coprocessor is for each WALEdit
-      // instead of a KeyValue.
-      if (coprocessorHost != null) {
-        WALKey walKey = entry.getKey();
-        logKey =
-            new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
-                .getTableName().toByteArray()), walKey.getLogSequenceNumber(),
-                walKey.getWriteTime(), clusterId);
-        if (coprocessorHost.preWALRestore(this.getRegionInfo(), logKey, val)) {
-          // if bypass this log entry, ignore it ...
-          continue;
-        }
-        logEntries.add(new Pair<HLogKey, WALEdit>(logKey, val));
-      }
-      mutations.addAll(tmpEditMutations);
-    }
-
-    return mutations;
-  }
 }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1527445&r1=1527444&r2=1527445&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Mon Sep 30 05:28:24 2013
@@ -195,6 +195,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -3867,9 +3868,27 @@ public class HRegionServer implements Cl
       
       HRegion region = this.getRegionByEncodedName(
         entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
+      RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
       List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey,
WALEdit>>();
-      List<Pair<MutationType, Mutation>> mutations = region.getReplayMutations(
-        request.getEntryList(), cells, UUID.fromString(this.clusterId), walEntries);
+      List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType,
Mutation>>();
+      for (WALEntry entry : entries) {
+        Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null : 
+          new Pair<HLogKey, WALEdit>();
+        List<Pair<MutationType, Mutation>> edits = HLogSplitter.getMutationsFromWALEntry(entry,

+          cells, walEntry);
+        if (coprocessorHost != null) {
+          // Start coprocessor replay here. The coprocessor is for each WALEdit instead of
a
+          // KeyValue.
+          if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
+            walEntry.getSecond())) {
+            // if bypass this log entry, ignore it ...
+            continue;
+          }
+          walEntries.add(walEntry);
+        }
+        mutations.addAll(edits);
+      }
+
       if (!mutations.isEmpty()) {
         OperationStatus[] result = doBatchOp(region, mutations, true);
         // check if it's a partial success
@@ -3879,9 +3898,9 @@ public class HRegionServer implements Cl
           }
         }
       }
-      if (region.getCoprocessorHost() != null) {
+      if (coprocessorHost != null) {
         for (Pair<HLogKey, WALEdit> wal : walEntries) {
-          region.getCoprocessorHost().postWALRestore(region.getRegionInfo(), wal.getFirst(),
+          coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
             wal.getSecond());
         }
       }
@@ -4096,12 +4115,13 @@ public class HRegionServer implements Cl
   }
 
   /**
-   * Execute a list of Put/Delete mutations.
+   * Execute a list of Put/Delete mutations. The function returns OperationStatus instead
of
+   * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
    * @param region
    * @param mutations
    * @param isReplay
-   * @return an array of OperationStatus which internally contains the
-   *         OperationStatusCode and the exceptionMessage if any
+   * @return an array of OperationStatus which internally contains the OperationStatusCode
and the
+   *         exceptionMessage if any
    * @throws IOException
    */
   protected OperationStatus[] doBatchOp(final HRegion region,

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1527445&r1=1527444&r2=1527445&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
Mon Sep 30 05:28:24 2013
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +53,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -64,6 +69,7 @@ import org.apache.hadoop.hbase.client.Co
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -73,9 +79,13 @@ import org.apache.hadoop.hbase.monitorin
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -1827,4 +1837,76 @@ public class HLogSplitter {
       super(s);
     }
   }
+
+  /**
+   * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey
&
+   * WALEdit from the passed in WALEntry
+   * @param entry
+   * @param cells
+   * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
+   *          extracted from the passed in WALEntry.
+   * @return list of Pair<MutationType, Mutation> to be replayed
+   * @throws IOException
+   */
+  public static List<Pair<MutationType, Mutation>> getMutationsFromWALEntry(WALEntry
entry,
+      CellScanner cells, Pair<HLogKey, WALEdit> logEntry) throws IOException {
+
+    if (entry == null) {
+      // return an empty array
+      return new ArrayList<Pair<MutationType, Mutation>>();
+    }
+
+    int count = entry.getAssociatedCellCount();
+    List<Pair<MutationType, Mutation>> mutations = new ArrayList<Pair<MutationType,
Mutation>>();
+    Cell previousCell = null;
+    Mutation m = null;
+    HLogKey key = null;
+    WALEdit val = null;
+    if (logEntry != null) val = new WALEdit();
+
+    for (int i = 0; i < count; i++) {
+      // Throw index out of bounds if our cell count is off
+      if (!cells.advance()) {
+        throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+      }
+      Cell cell = cells.current();
+      if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
+
+      boolean isNewRowOrType =
+          previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
+              || !CellUtil.matchingRow(previousCell, cell);
+      if (isNewRowOrType) {
+        // Create new mutation
+        if (CellUtil.isDelete(cell)) {
+          m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+          mutations.add(new Pair<MutationType, Mutation>(MutationType.DELETE, m));
+        } else {
+          m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+          mutations.add(new Pair<MutationType, Mutation>(MutationType.PUT, m));
+        }
+      }
+      if (CellUtil.isDelete(cell)) {
+        ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
+      } else {
+        ((Put) m).add(KeyValueUtil.ensureKeyValue(cell));
+      }
+      previousCell = cell;
+    }
+
+    // reconstruct HLogKey
+    if (logEntry != null) {
+      WALKey walKey = entry.getKey();
+      List<UUID> clusterIds = new ArrayList<UUID>(walKey.getClusterIdsCount());
+      for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
+        clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
+      }
+      key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
+              .getTableName().toByteArray()), walKey.getLogSequenceNumber(), walKey.getWriteTime(),
+              clusterIds);
+      logEntry.setFirst(key);
+      logEntry.setSecond(val);
+    }
+
+    return mutations;
+  }
 }



Mime
View raw message