hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1459464 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/clie...
Date Thu, 21 Mar 2013 18:53:27 GMT
Author: liyin
Date: Thu Mar 21 18:53:26 2013
New Revision: 1459464

URL: http://svn.apache.org/r1459464
Log:
[0.89-fb] [HBASE-7517] Improve the PFFE mechanism such that (i) Ensure that if a thread gets
the AtomicBoolean for a single server, it will not be stopped to get the AtomicBoolean for
a different server. (ii) PFFE is only thrown for threads that are not a

Author: aaiyer

Summary:
Improvements to the PFFE mechanism, so as to let the
retrying threads to handle errors as if they were not in PFFE mode.

Threads that are allowed to go through (1 per server) will follow the
same logic as normal threads -- when not in fast fail mode. (The only
    exception being that all errors will be logged as LOG.error).
These threads will never throw a PFFE.

Threads that are not allowed to go through, will throw PFFE Exception.

Test Plan:
Add unit test to debug.
Test it out on tsh025.

Reviewers: liyintang, kannan, kranganathan, adela

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D690778

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1459464&r1=1459463&r2=1459464&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Thu Mar 21 18:53:26 2013
@@ -52,6 +52,7 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hbase.MasterNot
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HConnectionManager.TableServers.FailureInfo;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
@@ -389,26 +391,30 @@ public class HConnectionManager {
       ConcurrentSkipListMap<byte [], HRegionLocation>>();
 
     // amount of time to wait before we consider a server to be in fast fail mode
-    private long fastFailThresholdMilliSec;
+    protected long fastFailThresholdMilliSec;
     // Keeps track of failures when we cannot talk to a server. Helps in
     // fast failing clients if the server is down for a long time.
-    private final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap =
+    protected final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap
=
       new ConcurrentHashMap<HServerAddress, FailureInfo>();
     // We populate repeatedFailuresMap every time there is a failure. So, to keep it
     // from growing unbounded, we garbage collect the failure information
     // every cleanupInterval.
-    private final long failureMapCleanupIntervalMilliSec;
-    private volatile long lastFailureMapCleanupTimeMilliSec;
+    protected final long failureMapCleanupIntervalMilliSec;
+    protected volatile long lastFailureMapCleanupTimeMilliSec;
     // Amount of time that has to pass, before we clear region -> regionserver cache
-    // again, when in fast fail mode.
-    private long cacheClearingTimeoutMilliSec;
+    // again, when in fast fail mode. This is used to clean unused entries.
+    protected long cacheClearingTimeoutMilliSec;
+    // clear failure Info. Used to clean out all entries.
+    // A safety valve, in case the client does not exit the
+    // fast fail mode for any reason.
+    private long fastFailClearingTimeMilliSec;
 
     /**
      * Keeps track of repeated failures to any region server.
      * @author amitanand.s
      *
      */
-    private class FailureInfo {
+    protected class FailureInfo {
       // The number of consecutive failures.
       private final AtomicLong numConsecutiveFailures = new AtomicLong();
       // The time when the server started to become unresponsive
@@ -425,7 +431,27 @@ public class HConnectionManager {
       // the rest of the client threads will fail fast.
       private final AtomicBoolean
           exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(false);
+
+      public String toString() {
+        return "FailureInfo: numConsecutiveFailures = " + numConsecutiveFailures
+            + " timeOfFirstFailureMilliSec = " + timeOfFirstFailureMilliSec
+            + " timeOfLatestAttemptMilliSec = " + timeOfLatestAttemptMilliSec
+            + " timeOfLatestCacheClearMilliSec = " + timeOfLatestCacheClearMilliSec
+            + " exclusivelyRetringInspiteOfFastFail  = " + exclusivelyRetringInspiteOfFastFail.get();
+      }
+
+      FailureInfo(long firstFailureTime) {
+        this.timeOfFirstFailureMilliSec = firstFailureTime;
+      }
+    }
+    private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
+        new ThreadLocal<MutableBoolean>();
+
+    // For TESTING purposes only;
+    public Map<HServerAddress, FailureInfo> getFailureMap() {
+      return repeatedFailuresMap;
     }
+
     // The presence of a server in the map implies it's likely that there is an
     // entry in cachedRegionLocations that map to this server; but the absence
     // of a server in this map guarentees that there is no entry in cache that
@@ -487,6 +513,8 @@ public class HConnectionManager {
           60000); // 1 min
       this.failureMapCleanupIntervalMilliSec = conf.getLong(
           "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
+      this.fastFailClearingTimeMilliSec = conf.getLong(
+          "hbase.client.fastfail.cleanup.all.millisec", 900000); // 15 mins
 
       this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
           10);
@@ -950,7 +978,8 @@ public class HConnectionManager {
 
           server = metaLocation.getServerAddress();
           // Handle the case where .META. is on an unresponsive server.
-          if (inFastFailMode(server)) {
+          if (inFastFailMode(server) &&
+              !this.currentThreadInFastFailMode()) {
             // In Fast-fail mode, all but one thread will fast fail. Check
             // if we are that one chosen thread.
             fInfo = repeatedFailuresMap.get(server);
@@ -1201,7 +1230,7 @@ public class HConnectionManager {
      * @param tablename
      * @param server
      */
-    private void clearCachedLocationForServer(
+    protected void clearCachedLocationForServer(
         final String server) {
       boolean deletedSomething = false;
 
@@ -1564,7 +1593,8 @@ public class HConnectionManager {
 
         // Logic to fast fail requests to unreachable servers.
         server = callable.getServerAddress();
-        if (inFastFailMode(server)) {
+        if (inFastFailMode(server) &&
+            !currentThreadInFastFailMode()) {
           // In Fast-fail mode, all but one thread will fast fail. Check
           // if we are that one chosen thread.
           fInfo = repeatedFailuresMap.get(server);
@@ -1614,15 +1644,13 @@ public class HConnectionManager {
      * @param t - the throwable to be handled.
      * @throws PreemptiveFastFailException
      */
-    private void handleFailureToServer(HServerAddress server, Throwable t)
-        throws PreemptiveFastFailException {
+    private void handleFailureToServer(HServerAddress server, Throwable t) {
       if (server == null || t == null) return;
 
       long currentTime = System.currentTimeMillis();
       FailureInfo fInfo = repeatedFailuresMap.get(server);
       if (fInfo == null) {
-        fInfo = new FailureInfo();
-        fInfo.timeOfFirstFailureMilliSec = currentTime;
+        fInfo = new FailureInfo(currentTime);
         FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(server, fInfo);
         if (oldfInfo != null) {
           fInfo = oldfInfo;
@@ -1637,11 +1665,8 @@ public class HConnectionManager {
             fInfo.timeOfLatestCacheClearMilliSec = currentTime;
             clearCachedLocationForServer(server.toString());
           }
-
-          LOG.error("Preemptive fast fail exception caused by : " +  t.toString());
-
-          throw  new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
-            fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec);
+          LOG.error("Exception in FastFail mode : " +  t.toString());
+          return;
       }
 
       // if thrown these exceptions, we clear all the cache entries that
@@ -1667,13 +1692,26 @@ public class HConnectionManager {
       // remove entries that haven't been attempted in a while
       // No synchronization needed. It is okay if multiple threads try to
       // remove the entry again and again from a concurrent hash map.
-      lastFailureMapCleanupTimeMilliSec = now;
+      StringBuilder sb = new StringBuilder();
       for(Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap.entrySet())
{
         if (now > entry.getValue().timeOfLatestAttemptMilliSec
-            + failureMapCleanupIntervalMilliSec) {
+              + failureMapCleanupIntervalMilliSec) { // no recent failures
+          repeatedFailuresMap.remove(entry.getKey());
+        } else if (now > entry.getValue().timeOfFirstFailureMilliSec
+            + this.fastFailClearingTimeMilliSec) { // been failing for a long time
+          LOG.error(entry.getKey() + " been failing for a long time. clearing out."
+            + entry.getValue().toString());
           repeatedFailuresMap.remove(entry.getKey());
+        } else {
+          sb.append(entry.getKey().toString() + " failing " + entry.getValue().toString()
+ "\n");
         }
       }
+      if (sb.length() > 0
+        // If there are multiple threads cleaning up, try to see that only one will log the
msg.
+          && now > this.lastFailureMapCleanupTimeMilliSec + this.failureMapCleanupIntervalMilliSec)
{
+        LOG.warn("Preemptive failure enabled for : " + sb.toString());
+      }
+      lastFailureMapCleanupTimeMilliSec = now;
     }
 
     /**
@@ -1694,6 +1732,15 @@ public class HConnectionManager {
     }
 
     /**
+     * Checks to see if the current thread is already in FastFail mode for *some* server.
+     * @return true, if the thread is already in FF mode.
+     */
+    private boolean currentThreadInFastFailMode() {
+      return (this.threadRetryingInFastFailMode.get() != null &&
+            this.threadRetryingInFastFailMode.get().booleanValue() == true);
+    }
+
+    /**
      * Check to see if the client should try to connnect to the server, inspite of
      * knowing that it is in the fast fail mode.
      *
@@ -1707,8 +1754,18 @@ public class HConnectionManager {
       // We believe that the server is down, But, we want to have just one client
       // actively trying to connect. If we are the chosen one, we will retry
       // and not throw an exception.
-      return  (fInfo != null &&
-          fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true));
+      if (fInfo != null && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false,
true)) {
+        MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode.get();
+        if (threadAlreadyInFF == null) {
+          threadAlreadyInFF = new MutableBoolean();
+          this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
+        }
+        threadAlreadyInFF.setValue(true);
+
+        return true;
+      } else {
+        return false;
+      }
     }
 
     /**
@@ -1734,6 +1791,7 @@ public class HConnectionManager {
         // Release the lock if we were retrying inspite of FastFail
         if (retryDespiteFastFailMode) {
           fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
+          threadRetryingInFastFailMode.get().setValue(false);
         }
       }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java?rev=1459464&r1=1459463&r2=1459464&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
Thu Mar 21 18:53:26 2013
@@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.ipc.HRegi
 import org.apache.hadoop.hbase.master.RegionManager.RegionState;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
 
 /**
  * Instantiated when a server's lease has expired, meaning it has crashed.
@@ -366,6 +368,8 @@ class ProcessServerShutdown extends Regi
 
   @Override
   protected RegionServerOperationResult process() throws IOException {
+    InjectionHandler.processEvent(InjectionEvent.HMASTER_START_PROCESS_DEAD_SERVER);
+
     switch (this.logSplitResult) {
     case NOT_RUNNING:
       LOG.info("Process server shut down for dead server " + deadServer);
@@ -382,7 +386,7 @@ class ProcessServerShutdown extends Regi
     case FAILED:
       logSplitResult = LogSplitResult.NOT_RUNNING;
       throw new IOException("Failed splitting log for dead server " +
-				deadServer);
+        deadServer);
 
     default:
       throw new RuntimeException("Invalid split log result: "
@@ -391,7 +395,7 @@ class ProcessServerShutdown extends Regi
 
     LOG.info("Log split is completed for " + deadServer
         + ", meta reassignment and scanning: "
-	+ "rootRescanned: " + rootRescanned + ", numberOfMetaRegions: "
+      + "rootRescanned: " + rootRescanned + ", numberOfMetaRegions: "
       + master.getRegionManager().numMetaRegions()
       + ", onlineMetaRegions.size(): "
       + master.getRegionManager().numOnlineMetaRegions());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1459464&r1=1459463&r2=1459464&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
Thu Mar 21 18:53:26 2013
@@ -33,5 +33,6 @@ public enum InjectionEvent {
   HMASTER_ENABLE_TABLE,
   HMASTER_DISABLE_TABLE,
   ZKUNASSIGNEDWATCHER_REGION_OPENED,
-  SPLITLOGWORKER_SPLIT_LOG_START
+  SPLITLOGWORKER_SPLIT_LOG_START,
+  HMASTER_START_PROCESS_DEAD_SERVER
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1459464&r1=1459463&r2=1459464&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Thu
Mar 21 18:53:26 2013
@@ -455,6 +455,14 @@ public class MiniHBaseCluster {
   }
 
   /**
+   * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
+   * of HRS carrying regionName. Returns -1 if none found.
+   */
+  public int getServerWithRoot() {
+    return getServerWith(HRegionInfo.ROOT_REGIONINFO.getRegionName());
+  }
+
+  /**
    * Get the location of the specified region
    * @param regionName Name of the region in bytes
    * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java?rev=1459464&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
Thu Mar 21 18:53:26 2013
@@ -0,0 +1,351 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HConnectionManager.TableServers;
+import org.apache.hadoop.hbase.client.HConnectionManager.TableServers.FailureInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DelayInducingInjectionHandler;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Run tests that use the HBase clients; {@link HTable} and {@link HTablePool}.
+ * Sets up the HBase mini cluster once at start and runs through all client tests.
+ * Each creates a table named for the method and does its stuff against that.
+ */
+public class TestFastFail {
+  private static final Log LOG = LogFactory.getLog(TestFastFail.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte [] FAMILY = Bytes.toBytes("testFamily");
+  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static int SLAVES = 15;
+
+  private static  byte [] TABLE = Bytes.toBytes("testFastFail");
+  private static  int NUM_REGIONS = 100;
+  private static  int NUM_ROWS = 5000;
+  private static  int NUM_READER_THREADS = 10;
+  private static  HTable ht;
+  private static  MiniHBaseCluster cluster;
+  private static ThreadLocal<MutableLong> lastErrorLogTime = new ThreadLocal<MutableLong>();
+  private volatile static boolean doRead = true;
+
+  private static long DELAY = 5000; // 5 sec
+
+  private static final DelayInducingInjectionHandler delayer = new DelayInducingInjectionHandler();
+
+  static CountDownLatch stopReading = new CountDownLatch(NUM_READER_THREADS);
+  static CountDownLatch startReading = new CountDownLatch(1);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+    cluster = TEST_UTIL.getMiniHBaseCluster();
+
+    // Set the injection Handler to sleep before processing dead servers
+    delayer.setEventDelay(InjectionEvent.HMASTER_START_PROCESS_DEAD_SERVER, DELAY);
+
+    // Set the fast fail timeout for the client to be really small. 1 sec.
+    TEST_UTIL.getConfiguration().setInt("hbase.client.fastfail.threshold", 1000);
+    ht = TEST_UTIL.createTable(TABLE, new byte[][]{FAMILY},
+        3, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
+
+    writeData(ht, NUM_ROWS);
+
+    // Start 10 threads that keep reading stuff.
+    for (int i = 0; i < NUM_READER_THREADS; i++) {
+      LOG.info("Launching client reader thread " + i);
+      Thread readerThread = new Thread("client-reader-" + i) {
+        public void run() {
+          LOG.info("Starting reads : " + getName());
+          HTable table;
+          try {
+            table = new HTable(TEST_UTIL.getConfiguration(), TABLE);
+          } catch (IOException e1) {
+            // TODO Auto-generated catch block
+            e1.printStackTrace();
+            return;
+          }
+
+          while (true) {
+            try {
+              // ignore errors.
+              if (doRead) {
+                readData(table, NUM_ROWS, true);
+              } else {
+                LOG.info("Stopping reads.");
+                stopReading.countDown();
+                // wait till the RS gets killed by the mtin test
+                LOG.info("Waiting to start reads again.");
+                startReading.await();
+                LOG.info("Starting reads again.");
+              }
+            } catch (IOException e) {
+              // should never get here.
+            } catch (InterruptedException e) {
+              // TODO Auto-generated catch block
+              e.printStackTrace();
+            }
+          }
+        }
+      };
+      readerThread.setDaemon(true);
+      readerThread.start();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    ht.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    stopReading = new CountDownLatch(NUM_READER_THREADS);
+    startReading = new CountDownLatch(1);
+    LOG.info("preliminary Writing before the test");
+    writeData(ht, NUM_ROWS);
+    LOG.info("preliminary Reading before the test");
+    readData(ht, NUM_ROWS, false);
+    InjectionHandler.set(delayer);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("Clearing out the Injection Handler");
+    InjectionHandler.clear();
+    LOG.info("Waiting for master to stabilize everything");
+    Threads.sleep(2 * DELAY);
+    LOG.info("Proceeding");
+  }
+
+  @Test(timeout = 100000)
+  public void testFastFailNormalWithoutClearCache() throws Exception {
+    testFastFailNormal(false);
+  }
+
+  @Test(timeout = 100000)
+  public void testFastFailNormalWithClearCache() throws Exception {
+    testFastFailNormal(true);
+  }
+
+  public void testFastFailNormal(boolean clearCache) throws Exception {
+    LOG.info("Running test: testFastFailNormal. clearCache = " + clearCache);
+    // read once. ensure that we can read without any errors.
+    // kill a RS randomly
+    Random rand = new Random(234343); // random fixed seed.
+
+    int root_id = cluster.getServerWithRoot();
+    int meta_id = cluster.getServerWithMeta();
+    int killedCnt = 0;
+    do {
+      // ensure we don't end up with negative numbers.
+      int rsId = (SLAVES + rand.nextInt() % SLAVES) % SLAVES;
+
+      if (rsId == root_id || rsId == meta_id) continue;
+
+      killedCnt++;
+      killRSAndWaitUntilStabilize(rsId, clearCache);
+    } while (killedCnt < 1);
+  }
+
+  @Test(timeout = 100000)
+  public void testFastFailMetaWithoutClearCache() throws Exception {
+    testFastFailMeta(false);
+  }
+
+  @Test(timeout = 100000)
+  public void testFastFailMetaWithClearCache() throws Exception {
+    testFastFailMeta(true);
+  }
+
+  public void testFastFailMeta(boolean clearCache) throws Exception {
+    LOG.info("Running test: testFastFailMeta. clearCache = " + clearCache);
+    do {
+      int meta_id = cluster.getServerWithMeta();
+
+      if (meta_id == -1) {
+        LOG.info("META not yet assigned. Sleeping 500ms.");
+        Threads.sleep(500);
+        continue;
+      }
+
+      killRSAndWaitUntilStabilize(meta_id, clearCache);
+      return;
+    } while (true);
+  }
+
+  @Test(timeout = 100000)
+  public void testFastFailRootWithoutClearMeta() throws Exception {
+    testFastFailRoot(false);
+  }
+
+  @Test(timeout = 100000)
+  public void testFastFailRootWithClearMeta() throws Exception {
+    testFastFailRoot(true);
+  }
+
+  public void testFastFailRoot(boolean clearCache) throws Exception {
+    LOG.info("Running test: testFastFailRoot. clearCache = " + clearCache);
+    do {
+      int root_id = cluster.getServerWithRoot();
+
+      if (root_id == -1) {
+        LOG.info("ROOT not yet assigned. Sleeping 500ms.");
+        Threads.sleep(500);
+        continue;
+      }
+
+      killRSAndWaitUntilStabilize(root_id, clearCache);
+      return;
+    } while (true);
+  }
+
+
+  private void killRSAndWaitUntilStabilize(int rsId, boolean populateFailureMap) throws Exception
{
+    doRead = false;
+    stopReading.await();
+
+    LOG.debug("Killing region server " + rsId);
+    // kill a RS randomly
+    cluster.abortRegionServer(rsId);
+
+    if (populateFailureMap) {
+      TableServers hcm = (TableServers)
+          HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
+
+      long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+      Map<HServerAddress, FailureInfo> repeatedFailuresMap =
+        hcm.getFailureMap();
+      for (RegionServerThread th: cluster.getRegionServerThreads()) {
+        HServerAddress serverAddr = th.getRegionServer().getServerInfo().getServerAddress();
+        FailureInfo fInfo = hcm.new FailureInfo(currentTime - 100000); // 100 sec ago
+        repeatedFailuresMap.put(serverAddr, fInfo);
+
+        // clear out the cache
+        hcm.clearCachedLocationForServer(serverAddr.toString());
+      }
+    }
+
+    Threads.sleep(1000);
+    doRead = true;
+    startReading.countDown();
+
+    int counter = 0;
+    while (true) {
+      try {
+        LOG.info("Trying to read data to see if things work fine. Trial # " + ++counter);
+        readData(ht, NUM_ROWS, false);
+
+        LOG.info("Reading succeded.");
+        break;
+      } catch(IOException e) {
+        LOG.info("readData had errors " + e.toString()
+            + " during trial #" + counter);
+        e.printStackTrace();
+        Threads.sleep(500); // 500 ms
+      }
+    }
+  }
+
+  /**
+   * Write data to the htable. While randomly killing/shutting down regionservers.
+   * @param table
+   * @param numRows
+   * @param action -- enum RegionServerAction which defines the type of action.
+   * @return number of attempts to complete the batch.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public static void writeData(HTable table, long numRows) throws IOException, InterruptedException
{
+    for (long i = 0; i < numRows; i++) {
+      byte [] rowKey = longToByteArrayKey(i);
+      Put put = new Put(rowKey);
+      byte[] value = rowKey; // value is the same as the row key
+      put.add(FAMILY, QUALIFIER, value);
+      table.put(put);
+    }
+    LOG.info("Written all puts.");
+  }
+
+  public static void readData(HTable table, long numRows, boolean ignoreExceptions) throws
IOException {
+    for(long i = 0; i < numRows; i++) {
+      byte [] rowKey = longToByteArrayKey(i);
+
+      Get get = new Get(rowKey);
+      get.addColumn(FAMILY, QUALIFIER);
+      get.setMaxVersions(1);
+      try {
+        Result result = table.get(get);
+
+        assertTrue(Arrays.equals(rowKey, result.getValue(FAMILY, QUALIFIER)));
+      } catch (IOException e) {
+        MutableLong lastErrorLoggedAt = lastErrorLogTime.get();
+        if (lastErrorLoggedAt == null) {
+          lastErrorLoggedAt = new MutableLong(0);
+          lastErrorLogTime.set(lastErrorLoggedAt);
+        }
+
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        if (now > lastErrorLoggedAt.longValue() + 1000) { // PFFE will flood this. rate
limit to 1 msg/second/thread
+          lastErrorLoggedAt.setValue(now);
+          LOG.error("Caught Exception " + e.toString());
+        }
+        if (!ignoreExceptions) throw e;
+      }
+    }
+  }
+
+  private static byte[] longToByteArrayKey(long rowKey) {
+    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
+  }
+}
+

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java?rev=1459464&r1=1459463&r2=1459464&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java
Thu Mar 21 18:53:26 2013
@@ -81,7 +81,7 @@ public class DelayInducingInjectionHandl
       }
       eventsToWaitFor.get(event).countDown();
       long delayTimeMs = eventToDelayTimeMs.get(event);
-      LOG.warn("Sleeping for " + delayTimeMs + " ms for " + event);
+      LOG.warn("Sleeping " + delayTimeMs + " ms for " + event);
       try {
         Thread.sleep(delayTimeMs);
       } catch (InterruptedException e) {



Mime
View raw message