hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From este...@apache.org
Subject hbase git commit: HBASE-16345 RpcRetryingCallerWithReadReplicas#call() should catch some RegionServer Exceptions
Date Wed, 19 Oct 2016 21:04:07 GMT
Repository: hbase
Updated Branches:
  refs/heads/master d9ee25e82 -> 72db95388


HBASE-16345 RpcRetryingCallerWithReadReplicas#call() should catch some RegionServer Exceptions

Fix logic for
1). how to handle exception while waiting for reply from the primary replica.
2). handle exception from replicas while waiting for a correct response.

Signed-off-by: Esteban Gutierrez <esteban@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/72db9538
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/72db9538
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/72db9538

Branch: refs/heads/master
Commit: 72db95388664fb23314b2e6bb437b961c797f579
Parents: d9ee25e
Author: Huaxiang Sun <hsun@cloudera.com>
Authored: Tue Oct 18 14:10:09 2016 -0700
Committer: Esteban Gutierrez <esteban@apache.org>
Committed: Wed Oct 19 13:55:43 2016 -0700

----------------------------------------------------------------------
 .../client/ResultBoundedCompletionService.java  | 118 ++++++++++++--
 .../RpcRetryingCallerWithReadReplicas.java      |  32 ++--
 .../client/ScannerCallableWithReplicas.java     |  60 ++++---
 .../hbase/client/TestReplicaWithCluster.java    | 155 +++++++++++++++++--
 4 files changed, 310 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/72db9538/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
index 9b32e93..2848c9d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -18,13 +18,18 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.ArrayList;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.htrace.Trace;
 
 /**
@@ -32,13 +37,21 @@ import org.apache.htrace.Trace;
  * Keeps the list of the futures, and allows to cancel them all.
  * This means as well that it can be used for a small set of tasks only.
  * <br>Implementation is not Thread safe.
+ *
+ * CompletedTasks is implemented as a queue, the entry is added based on the time order.
I.e,
+ * when the first task completes (whether it is a success or failure), it is added as a first
+ * entry in the queue, the next completed task is added as a second entry in the queue, ...
+ * When iterating through the queue, we know it is based on time order. If the first
+ * completed task succeeds, it is returned. If it is failure, the iteration goes on until
it
+ * finds a success.
  */
 @InterfaceAudience.Private
 public class ResultBoundedCompletionService<V> {
+  private static final Log LOG = LogFactory.getLog(ResultBoundedCompletionService.class);
   private final RpcRetryingCallerFactory retryingCallerFactory;
   private final Executor executor;
   private final QueueingFuture<V>[] tasks; // all the tasks
-  private volatile QueueingFuture<V> completed = null;
+  private final ArrayList<QueueingFuture> completedTasks; // completed tasks
   private volatile boolean cancelled = false;
   
   class QueueingFuture<T> implements RunnableFuture<T> {
@@ -49,12 +62,14 @@ public class ResultBoundedCompletionService<V> {
     private final int callTimeout;
     private final RpcRetryingCaller<T> retryingCaller;
     private boolean resultObtained = false;
+    private final int replicaId;  // replica id
 
 
-    public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
+    public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
       this.future = future;
       this.callTimeout = callTimeout;
       this.retryingCaller = retryingCallerFactory.<T>newCaller();
+      this.replicaId = id;
     }
 
     @SuppressWarnings("unchecked")
@@ -70,8 +85,8 @@ public class ResultBoundedCompletionService<V> {
       } finally {
         synchronized (tasks) {
           // If this wasn't canceled then store the result.
-          if (!cancelled && completed == null) {
-            completed = (QueueingFuture<V>) QueueingFuture.this;
+          if (!cancelled) {
+            completedTasks.add(QueueingFuture.this);
           }
 
           // Notify just in case there was someone waiting and this was canceled.
@@ -80,6 +95,7 @@ public class ResultBoundedCompletionService<V> {
         }
       }
     }
+
     @Override
     public boolean cancel(boolean mayInterruptIfRunning) {
       if (resultObtained || exeEx != null) return false;
@@ -129,6 +145,14 @@ public class ResultBoundedCompletionService<V> {
 
       throw new TimeoutException("timeout=" + timeout + ", " + unit);
     }
+
+    public int getReplicaId() {
+      return replicaId;
+    }
+
+    public ExecutionException getExeEx() {
+      return exeEx;
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -138,27 +162,103 @@ public class ResultBoundedCompletionService<V> {
     this.retryingCallerFactory = retryingCallerFactory;
     this.executor = executor;
     this.tasks = new QueueingFuture[maxTasks];
+    this.completedTasks = new ArrayList<>(maxTasks);
   }
 
 
   public void submit(RetryingCallable<V> task, int callTimeout, int id) {
-    QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
+    QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout, id);
     executor.execute(Trace.wrap(newFuture));
     tasks[id] = newFuture;
   }
 
   public QueueingFuture<V> take() throws InterruptedException {
     synchronized (tasks) {
-      while (completed == null && !cancelled) tasks.wait();
+      while (!cancelled && (completedTasks.size() < 1)) tasks.wait();
     }
-    return completed;
+    return completedTasks.get(0);
   }
 
+  /**
+   * Poll for the first completed task whether it is a success or execution exception.
+   *
+   * @param timeout  - time to wait before it times out
+   * @param unit  - time unit for timeout
+   */
   public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException
{
+    return pollForSpecificCompletedTask(timeout, unit, 0);
+  }
+
+  /**
+   * Poll for the first successfully completed task whose completed order is in startIndex,
+   * endIndex(exclusive) range
+   *
+   * @param timeout  - time to wait before it times out
+   * @param unit  - time unit for timeout
+   * @param startIndex - start index, starting from 0, inclusive
+   * @param endIndex - end index, exclusive
+   *
+   * @return If within timeout time, there is no successfully completed task, return null;
If all
+   *         tasks get execution exception, it will throw out the last execution exception,
+   *         otherwise return the first successfully completed task's result.
+   */
+  public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit
unit,
+      int startIndex, int endIndex)
+      throws InterruptedException, CancellationException, ExecutionException {
+
+    QueueingFuture<V>  f;
+    long start, duration;
+    for (int i = startIndex; i < endIndex; i ++) {
+
+      start = EnvironmentEdgeManager.currentTime();
+      f = pollForSpecificCompletedTask(timeout, unit, i);
+      duration = EnvironmentEdgeManager.currentTime() - start;
+
+      // Even with operationTimeout less than 0, still loop through the rest as there could
+      // be other completed tasks before operationTimeout.
+      timeout -= duration;
+
+      if (f == null) {
+        return null;
+      } else if (f.getExeEx() != null) {
+        // we continue here as we need to loop through all the results.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " +
+              f.getExeEx().getCause());
+        }
+
+        if (i == (endIndex - 1)) {
+          // Rethrow this exception
+          throw f.getExeEx();
+        }
+        continue;
+      }
+
+      return f;
+    }
+
+    // impossible to reach
+    return null;
+  }
+
+  /**
+   * Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...)
+   *
+   * @param timeout  - time to wait before it times out
+   * @param unit  - time unit for timeout
+   * @param index - the index(th) completed task, index starting from 0
+   */
+  private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit,
int index)
+      throws InterruptedException {
+    if (index < 0) {
+      return null;
+    }
+
     synchronized (tasks) {
-      if (completed == null && !cancelled) unit.timedWait(tasks, timeout);
+      if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks,
timeout);
+      if (completedTasks.size() <= index) return null;
     }
-    return completed;
+    return completedTasks.get(index);
   }
 
   public void cancelAll() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/72db9538/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index a290c78..316fad1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  */
 @InterfaceAudience.Private
 public class RpcRetryingCallerWithReadReplicas {
+  private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
   protected final ExecutorService pool;
   protected final ClusterConnection cConnection;
   protected final Configuration conf;
@@ -171,9 +174,12 @@ public class RpcRetryingCallerWithReadReplicas {
         : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
    final ResultBoundedCompletionService<Result> cs =
         new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool,
rl.size());
+    int startIndex = 0;
+    int endIndex = rl.size();
 
     if(isTargetReplicaSpecified) {
       addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
+      endIndex = 1;
     } else {
       addCallsForReplica(cs, rl, 0, 0);
       try {
@@ -183,7 +189,13 @@ public class RpcRetryingCallerWithReadReplicas {
           return f.get(); //great we got a response
         }
       } catch (ExecutionException e) {
-        throwEnrichedException(e, retries);
+        // We ignore the ExecutionException and continue with the secondary replicas
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Primary replica returns " + e.getCause());
+        }
+
+        // Skip the result from the primary as we know that there is something wrong
+        startIndex = 1;
       } catch (CancellationException e) {
         throw new InterruptedIOException();
       } catch (InterruptedException e) {
@@ -194,19 +206,14 @@ public class RpcRetryingCallerWithReadReplicas {
       addCallsForReplica(cs, rl, 1, rl.size() - 1);
     }
     try {
-      try {
-        long start = EnvironmentEdgeManager.currentTime();
-        Future<Result> f = cs.poll(operationTimeout, TimeUnit.MILLISECONDS);
-        long duration = EnvironmentEdgeManager.currentTime() - start;
-        if (f == null) {
-          throw new RetriesExhaustedException("timed out after " + duration + " ms");
-        }
-        return f.get(operationTimeout - duration, TimeUnit.MILLISECONDS);
-      } catch (ExecutionException e) {
-        throwEnrichedException(e, retries);
-      } catch (TimeoutException te) {
+      Future<Result> f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
+          TimeUnit.MILLISECONDS, startIndex, endIndex);
+      if (f == null) {
         throw new RetriesExhaustedException("timed out after " + operationTimeout + " ms");
       }
+      return f.get();
+    } catch (ExecutionException e) {
+      throwEnrichedException(e, retries);
     } catch (CancellationException e) {
       throw new InterruptedIOException();
     } catch (InterruptedException e) {
@@ -217,6 +224,7 @@ public class RpcRetryingCallerWithReadReplicas {
       cs.cancelAll();
     }
 
+    LOG.error("Imposible? Arrive at an unreachable line..."); // unreachable
     return null; // unreachable
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/72db9538/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 096841b..5174598 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -169,54 +169,70 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
     replicaSwitched.set(false);
     // submit call for the primary replica.
     addCallsForCurrentReplica(cs, rl);
+    int startIndex = 0;
 
     try {
       // wait for the timeout to see whether the primary responds back
       Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
           TimeUnit.MICROSECONDS); // Yes, microseconds
       if (f != null) {
-        Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
+        // After poll, if f is not null, there must be a completed task
+        Pair<Result[], ScannerCallable> r = f.get();
         if (r != null && r.getSecond() != null) {
           updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
         }
         return r == null ? null : r.getFirst(); //great we got a response
       }
     } catch (ExecutionException e) {
-      RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
+      // We ignore the ExecutionException and continue with the replicas
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Scan with primary region returns " + e.getCause());
+      }
+
+      // If rl's size is 1 or scan's consitency is strong, it needs to throw
+      // out the exception from the primary replica
+      if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) {
+        // Rethrow the first exception
+        RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
+      }
+
+      startIndex = 1;
     } catch (CancellationException e) {
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
       throw new InterruptedIOException(e.getMessage());
-    } catch (TimeoutException e) {
-      throw new InterruptedIOException(e.getMessage());
     }
 
     // submit call for the all of the secondaries at once
-    // TODO: this may be an overkill for large region replication
-    addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+    int endIndex = rl.size();
+    if (scan.getConsistency() == Consistency.STRONG) {
+      // When scan's consistency is strong, do not send to the secondaries
+      endIndex = 1;
+    } else {
+      // TODO: this may be an overkill for large region replication
+      addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
+    }
 
     try {
-      long start = EnvironmentEdgeManager.currentTime();
-      Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
-      long duration = EnvironmentEdgeManager.currentTime() - start;
-      if (f != null) {
-        Pair<Result[], ScannerCallable> r = f.get(timeout - duration, TimeUnit.MILLISECONDS);
-        if (r != null && r.getSecond() != null) {
-          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
-        }
-        return r == null ? null : r.getFirst(); // great we got an answer
-      } else {
-        throw new IOException("Failed to get result within timeout, timeout="
-            + timeout + "ms");
+      Future<Pair<Result[], ScannerCallable>> f = cs.pollForFirstSuccessfullyCompletedTask(timeout,
+          TimeUnit.MILLISECONDS, startIndex, endIndex);
+
+      if (f == null) {
+        throw new IOException("Failed to get result within timeout, timeout=" + timeout +
"ms");
+      }
+      Pair<Result[], ScannerCallable> r = f.get();
+
+      if (r != null && r.getSecond() != null) {
+        updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
       }
+      return r == null ? null : r.getFirst(); // great we got an answer
+
     } catch (ExecutionException e) {
       RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
     } catch (CancellationException e) {
       throw new InterruptedIOException(e.getMessage());
     } catch (InterruptedException e) {
       throw new InterruptedIOException(e.getMessage());
-    } catch (TimeoutException e) {
-      throw new InterruptedIOException(e.getMessage());
     } finally {
       // We get there because we were interrupted or because one or more of the
       // calls succeeded or failed. In all case, we stop all our tasks.
@@ -292,9 +308,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]>
{
   private void addCallsForOtherReplicas(
       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations
rl,
       int min, int max) {
-    if (scan.getConsistency() == Consistency.STRONG) {
-      return; // not scheduling on other replicas for strong consistency
-    }
+
     for (int id = min; id <= max; id++) {
       if (currentScannerCallable.id == id) {
         continue; //this was already scheduled earlier

http://git-wip-us.apache.org/repos/asf/hbase/blob/72db9538/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 70cd725..1687a4d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -63,7 +65,7 @@ import org.junit.experimental.categories.Category;
 public class TestReplicaWithCluster {
   private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
 
-  private static final int NB_SERVERS = 2;
+  private static final int NB_SERVERS = 3;
   private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
   private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
 
@@ -109,11 +111,56 @@ public class TestReplicaWithCluster {
     }
   }
 
+  /**
+   * This copro is used to simulate region server down exception for Get and Scan
+   */
+  public static class RegionServerStoppedCopro extends BaseRegionObserver {
+
+    public RegionServerStoppedCopro() {
+    }
+
+    @Override
+    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Get get, final List<Cell> results) throws IOException {
+
+      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
+
+      // Fail for the primary replica and replica 1
+      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
+        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
+        throw new RegionServerStoppedException("Server " +
+            e.getEnvironment().getRegionServerServices().getServerName()
+            + " not running");
+      } else {
+        LOG.info("We're replica region " + replicaId);
+      }
+    }
+
+    @Override
+    public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment>
e,
+        final Scan scan, final RegionScanner s) throws IOException {
+
+      int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
+
+      // Fail for the primary replica and replica 1
+      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
+        LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
+        throw new RegionServerStoppedException("Server " +
+            e.getEnvironment().getRegionServerServices().getServerName()
+            + " not running");
+      } else {
+        LOG.info("We're replica region " + replicaId);
+      }
+
+      return null;
+    }
+  }
+
   @BeforeClass
   public static void beforeClass() throws Exception {
     // enable store file refreshing
-    HTU.getConfiguration().setInt(
-        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
+    HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
+        REFRESH_PERIOD);
 
     HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
     HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
@@ -123,6 +170,13 @@ public class TestReplicaWithCluster {
     HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
     HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
 
+    // Wait for primary call longer so make sure that it will get exception from the primary
call
+    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
+    HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
+
+    // Retry less so it can fail faster
+    HTU.getConfiguration().setInt("hbase.client.retries.number", 1);
+
     HTU.startMiniCluster(NB_SERVERS);
     HTU.getHBaseCluster().startMaster();
   }
@@ -263,39 +317,39 @@ public class TestReplicaWithCluster {
     LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
 
     Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>()
{
-      @Override
-      public boolean evaluate() throws Exception {
+      @Override public boolean evaluate() throws Exception {
         try {
           SlowMeCopro.cdl.set(new CountDownLatch(1));
           Get g = new Get(row);
           g.setConsistency(Consistency.TIMELINE);
           Result r = table.get(g);
           Assert.assertTrue(r.isStale());
-          return  !r.isEmpty();
+          return !r.isEmpty();
         } finally {
           SlowMeCopro.cdl.get().countDown();
           SlowMeCopro.sleepTime.set(0);
         }
-      }});
+      }
+    });
     table.close();
     LOG.info("stale get on the first cluster done. Now for the second.");
 
     final Table table2 = HTU.getConnection().getTable(hdt.getTableName());
     Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>()
{
-      @Override
-      public boolean evaluate() throws Exception {
+      @Override public boolean evaluate() throws Exception {
         try {
           SlowMeCopro.cdl.set(new CountDownLatch(1));
           Get g = new Get(row);
           g.setConsistency(Consistency.TIMELINE);
           Result r = table2.get(g);
           Assert.assertTrue(r.isStale());
-          return  !r.isEmpty();
+          return !r.isEmpty();
         } finally {
           SlowMeCopro.cdl.get().countDown();
           SlowMeCopro.sleepTime.set(0);
         }
-      }});
+      }
+    });
     table2.close();
 
     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
@@ -387,4 +441,83 @@ public class TestReplicaWithCluster {
     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
     HTU.deleteTable(hdt.getTableName());
   }
+
+  @Test
+  public void testReplicaGetWithPrimaryDown() throws IOException {
+    // Create table then get the single region for our new table.
+    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+    hdt.setRegionReplication(NB_SERVERS);
+    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
+    try {
+      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
+
+      Put p = new Put(row);
+      p.addColumn(f, row, row);
+      table.put(p);
+
+      // Flush so it can be picked by the replica refresher thread
+      HTU.flush(table.getName());
+
+      // Sleep for some time until data is picked up by replicas
+      try {
+        Thread.sleep(2 * REFRESH_PERIOD);
+      } catch (InterruptedException e1) {
+        LOG.error(e1);
+      }
+
+      // But if we ask for stale we will get it
+      Get g = new Get(row);
+      g.setConsistency(Consistency.TIMELINE);
+      Result r = table.get(g);
+      Assert.assertTrue(r.isStale());
+    } finally {
+      HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+      HTU.deleteTable(hdt.getTableName());
+    }
+  }
+
+  @Test
+  public void testReplicaScanWithPrimaryDown() throws IOException {
+    // Create table then get the single region for our new table.
+    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+    hdt.setRegionReplication(NB_SERVERS);
+    hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
+
+    try {
+      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
+
+      Put p = new Put(row);
+      p.addColumn(f, row, row);
+      table.put(p);
+
+      // Flush so it can be picked by the replica refresher thread
+      HTU.flush(table.getName());
+
+      // Sleep for some time until data is picked up by replicas
+      try {
+        Thread.sleep(2 * REFRESH_PERIOD);
+      } catch (InterruptedException e1) {
+        LOG.error(e1);
+      }
+
+      // But if we ask for stale we will get it
+      // Instantiating the Scan class
+      Scan scan = new Scan();
+
+      // Scanning the required columns
+      scan.addFamily(f);
+      scan.setConsistency(Consistency.TIMELINE);
+
+      // Getting the scan result
+      ResultScanner scanner = table.getScanner(scan);
+
+      Result r = scanner.next();
+
+      Assert.assertTrue(r.isStale());
+    } finally {
+
+      HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+      HTU.deleteTable(hdt.getTableName());
+    }
+  }
 }


Mime
View raw message