hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject svn commit: r1523173 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/hbase/coprocessor/ test/java/org/apache/hadoop/hbase/region...
Date Sat, 14 Sep 2013 01:55:18 GMT
Author: jeffreyz
Date: Sat Sep 14 01:55:18 2013
New Revision: 1523173

URL: http://svn.apache.org/r1523173
Log:
hbase-9390: coprocessors observers are not called during a recovery with the new log replay
algorithm - 1

Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1523173&r1=1523172&r2=1523173&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Sat Sep 14 01:55:18 2013
@@ -2223,7 +2223,8 @@ public class HRegion implements HeapSize
       Mutation mutation = batchOp.operations[firstIndex];
       if (walEdit.size() > 0) {
         txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
-               walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
+              walEdit, mutation.getClusterIds(), now, this.htableDescriptor,
+              this.getCoprocessorHost());
       }
 
       // -------------------------------
@@ -4556,9 +4557,9 @@ public class HRegion implements HeapSize
           long txid = 0;
           // 7. Append no sync
           if (!walEdit.isEmpty()) {
-            txid = this.log.appendNoSync(this.getRegionInfo(),
-                this.htableDescriptor.getTableName(), walEdit,
-                processor.getClusterIds(), now, this.htableDescriptor);
+            txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
+                  walEdit, processor.getClusterIds(), now, this.htableDescriptor,
+                  this.getCoprocessorHost());
           }
           // 8. Release region lock
           if (locked) {
@@ -4785,7 +4786,7 @@ public class HRegion implements HeapSize
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
               walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
-              this.htableDescriptor);
+              this.htableDescriptor, this.getCoprocessorHost());
           } else {
             recordMutationWithoutWal(append.getFamilyCellMap());
           }
@@ -4933,7 +4934,7 @@ public class HRegion implements HeapSize
             // as a Put.
             txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
                 walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
-                this.htableDescriptor);
+                this.htableDescriptor, this.getCoprocessorHost());
           } else {
             recordMutationWithoutWal(increment.getFamilyCellMap());
           }

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1523173&r1=1523172&r2=1523173&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
Sat Sep 14 01:55:18 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.DrainBarrier;
@@ -840,7 +841,7 @@ class FSHLog implements HLog, Syncable {
   @Override
   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
     final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
-    append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore);
+    append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore,
null);
   }
 
   /**
@@ -870,9 +871,9 @@ class FSHLog implements HLog, Syncable {
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  private long append(HRegionInfo info, TableName tableName, WALEdit edits,
-      List<UUID> clusterIds, final long now, HTableDescriptor htd, boolean doSync,
-      boolean isInMemstore)
+  private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID>
clusterIds,
+      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore,
+      RegionCoprocessorHost regionCoproHost)
     throws IOException {
       if (edits.isEmpty()) return this.unflushedEntries.get();
       if (this.closed) {
@@ -893,7 +894,7 @@ class FSHLog implements HLog, Syncable {
           byte [] encodedRegionName = info.getEncodedNameAsBytes();
           if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
           HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds);
-          doWrite(info, logKey, edits, htd);
+          doWrite(info, logKey, edits, htd, regionCoproHost);
           this.numEntries.incrementAndGet();
           txid = this.unflushedEntries.incrementAndGet();
           if (htd.isDeferredLogFlush()) {
@@ -916,9 +917,10 @@ class FSHLog implements HLog, Syncable {
 
   @Override
   public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-      List<UUID> clusterIds, final long now, HTableDescriptor htd)
+      List<UUID> clusterIds, final long now, HTableDescriptor htd,
+      RegionCoprocessorHost regionCoproHost)
     throws IOException {
-    return append(info, tableName, edits, clusterIds, now, htd, false, true);
+    return append(info, tableName, edits, clusterIds, now, htd, false, true, regionCoproHost);
   }
 
   /**
@@ -1203,7 +1205,7 @@ class FSHLog implements HLog, Syncable {
 
   // TODO: Remove info.  Unused.
   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
-                           HTableDescriptor htd)
+                           HTableDescriptor htd, RegionCoprocessorHost regionCoproHost)
   throws IOException {
     if (!this.enabled) {
       return;
@@ -1220,12 +1222,18 @@ class FSHLog implements HLog, Syncable {
         if (logEdit.isReplay()) {
           // set replication scope null so that this won't be replicated
           logKey.setScopes(null);
+          if(regionCoproHost != null) {
+            regionCoproHost.preWALRestore(info, logKey, logEdit);
+          }
         }
         // write to our buffer for the Hlog file.
         logSyncer.append(new FSHLog.Entry(logKey, logEdit));
       }
       long took = EnvironmentEdgeManager.currentTimeMillis() - now;
       coprocessorHost.postWALWrite(info, logKey, logEdit);
+      if(logEdit.isReplay() && regionCoproHost != null ) {
+        regionCoproHost.postWALRestore(info, logKey, logEdit);
+      }
       long len = 0;
       for (KeyValue kv : logEdit.getKeyValues()) {
         len += kv.getLength();

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1523173&r1=1523172&r2=1523173&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Sat Sep 14 01:55:18 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.io.Writable;
 
 
@@ -300,7 +301,8 @@ public interface HLog {
    * @throws IOException
    */
   public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
-      List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
+        List<UUID> clusterIds, final long now, HTableDescriptor htd,
+        RegionCoprocessorHost regionCoproHost) throws IOException;
 
   // TODO: Do we need all these versions of sync?
   void hsync() throws IOException;

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1523173&r1=1523172&r2=1523173&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
(original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
Sat Sep 14 01:55:18 2013
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableList;
@@ -36,6 +37,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -103,6 +106,15 @@ public class SimpleRegionObserver extend
   final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0);
   final AtomicInteger ctPreBatchMutate = new AtomicInteger(0);
   final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
+  final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
+  final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
+
+
+  final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
+
+  public void setThrowOnPostFlush(Boolean val){
+    throwOnPostFlush.set(val);
+  }
 
   @Override
   public void start(CoprocessorEnvironment e) throws IOException {
@@ -144,7 +156,7 @@ public class SimpleRegionObserver extend
 
   @Override
   public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, InternalScanner scanner) {
+      Store store, InternalScanner scanner) throws IOException {
     ctPreFlush.incrementAndGet();
     return scanner;
   }
@@ -158,8 +170,11 @@ public class SimpleRegionObserver extend
 
   @Override
   public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, StoreFile resultFile) {
+      Store store, StoreFile resultFile) throws IOException {
     ctPostFlush.incrementAndGet();
+    if (throwOnPostFlush.get()){
+      throw new IOException("throwOnPostFlush is true in postFlush");
+    }
   }
 
   public boolean wasFlushed() {
@@ -502,6 +517,19 @@ public class SimpleRegionObserver extend
     return hasLoaded;
   }
 
+  @Override
+  public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo
info,
+                            HLogKey logKey, WALEdit logEdit) throws IOException {
+    ctPreWALRestore.incrementAndGet();
+  }
+
+  @Override
+  public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
+                             HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException
{
+    ctPostWALRestore.incrementAndGet();
+  }
+
+
   public boolean hadPreGet() {
     return ctPreGet.get() > 0;
   }
@@ -666,4 +694,12 @@ public class SimpleRegionObserver extend
   public int getCtPostIncrement() {
     return ctPostIncrement.get();
   }
+
+  public int getCtPreWALRestore() {
+    return ctPreWALRestore.get();
+  }
+
+  public int getCtPostWALRestore() {
+    return ctPostWALRestore.get();
+  }
 }

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1523173&r1=1523172&r2=1523173&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
(original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
Sat Sep 14 01:55:18 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -69,6 +70,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -92,6 +94,7 @@ public class TestRegionObserverInterface
   public static void setupBeforeClass() throws Exception {
     // set configure to indicate which cp should be loaded
     Configuration conf = util.getConfiguration();
+    conf.setBoolean("hbase.master.distributed.log.replay", true);
     conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
         "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver");
 
@@ -487,11 +490,66 @@ public class TestRegionObserverInterface
     table.close();
   }
 
+  @Test
+  public void testRecovery() throws Exception {
+    LOG.info(TestRegionObserverInterface.class.getName()+".testRecovery");
+    TableName tableName = TEST_TABLE;
+
+    HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+
+    JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
+    ServerName sn2 = rs1.getRegionServer().getServerName();
+    String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
+
+    util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
+    while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
+      Thread.sleep(100);
+    }
+
+    Put put = new Put(ROW);
+    put.add(A, A, A);
+    put.add(B, B, B);
+    put.add(C, C, C);
+    table.put(put);
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+            "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+        TEST_TABLE,
+        new Boolean[] {false, false, true, true, true, true, false}
+    );
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
+        TEST_TABLE,
+        new Integer[] {0, 0, 1, 1});
+
+    cluster.killRegionServer(rs1.getRegionServer().getServerName());
+    Threads.sleep(20000); // just to be sure that the kill has fully started.
+    util.waitUntilAllRegionsAssigned(tableName);
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[]{"getCtPreWALRestore", "getCtPostWALRestore"},
+        TEST_TABLE,
+        new Integer[]{1, 1});
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[]{"getCtPrePut", "getCtPostPut"},
+        TEST_TABLE,
+        new Integer[]{0, 0});
+
+    util.deleteTable(tableName);
+    table.close();
+  }
+
   // check each region whether the coprocessor upcalls are called or not.
-  private void verifyMethodResult(Class c, String methodName[], TableName tableName,
+  private void verifyMethodResult(Class<?> c, String methodName[], TableName tableName,
                                   Object value[]) throws IOException {
     try {
       for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
+        if (!t.isAlive() || t.getRegionServer().isAborted() || t.getRegionServer().isStopping()){
+          continue;
+        }
         for (HRegionInfo r : ProtobufUtil.getOnlineRegions(t.getRegionServer())) {
           if (!r.getTable().equals(tableName)) {
             continue;

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1523173&r1=1523172&r2=1523173&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
(original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Sat Sep 14 01:55:18 2013
@@ -3849,8 +3849,8 @@ public class TestHRegion extends HBaseTe
 
     //verify append called or not
     verify(log, expectAppend ? times(1) : never())
-      .appendNoSync((HRegionInfo)any(), eq(tableName),
-        (WALEdit)any(), (List<UUID>)any(), anyLong(), (HTableDescriptor)any());
+      .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List<UUID>)any(),

+        anyLong(), (HTableDescriptor)any(), (RegionCoprocessorHost)any());
 
     //verify sync called or not
     if (expectSync || expectSyncFromLogSyncer) {

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1523173&r1=1523172&r2=1523173&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
(original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
Sat Sep 14 01:55:18 2013
@@ -104,7 +104,7 @@ public final class HLogPerformanceEvalua
           HRegionInfo hri = region.getRegionInfo();
           if (this.noSync) {
             hlog.appendNoSync(hri, hri.getTable(), walEdit,
-                              new ArrayList<UUID>(), now, htd);
+                              new ArrayList<UUID>(), now, htd, null);
           } else {
             hlog.append(hri, hri.getTable(), walEdit, now, htd);
           }
@@ -191,7 +191,7 @@ public final class HLogPerformanceEvalua
             LOG.info("Rolling after " + appends + " edits");
             rollWriter();
           }
-          super.doWrite(info, logKey, logEdit, htd);
+          super.doWrite(info, logKey, logEdit, htd, null);
         };
       };
       hlog.rollWriter();



Mime
View raw message