hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1595280 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/mock/
Date Fri, 16 May 2014 18:18:35 GMT
Author: liyin
Date: Fri May 16 18:18:35 2014
New Revision: 1595280

URL: http://svn.apache.org/r1595280
Log:
[HBASE-11187] Limit the number of client threads per regionserver

Author: elliott

Summary:
When there are lots of HTables created and a region server becomes slow
to respond this will create lots of threads in the HConnection(TableServers).

This patch adds a config that will limit the number of thread per regionserver
that the HBase client can spawn. Above that number the requests will fail fast.

Test Plan: verynicetests

Reviewers: aaiyer, rshroff

Reviewed By: rshroff

Subscribers: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1331985

Tasks: 4330194

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Fri
May 16 18:18:35 2014
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -286,7 +287,7 @@ public interface HConnection extends Clo
    *  Or, operations were not successfully completed.
    */
   public void processBatchedGets(List<Get> actions, StringBytes tableName,
-      ExecutorService pool, Result[] results, HBaseRPCOptions options)
+      ListeningExecutorService pool, Result[] results, HBaseRPCOptions options)
       throws IOException, InterruptedException;
 
   /**
@@ -303,7 +304,7 @@ public interface HConnection extends Clo
    * META. Or, operations were not successfully completed.
    */
   public void processBatchedMutations(List<Mutation> actions,
-      StringBytes tableName, ExecutorService pool, List<Mutation> failures,
+      StringBytes tableName, ListeningExecutorService pool, List<Mutation> failures,
       HBaseRPCOptions options) throws IOException, InterruptedException;
 
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri May
16 18:18:35 2014
@@ -38,6 +38,8 @@ import java.util.concurrent.SynchronousQ
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -96,11 +98,12 @@ public class HTable implements HTableInt
 
   // Share this multiaction thread pool across all the HTable instance;
   // The total number of threads will be bounded #HTable * #RegionServer.
-  static ExecutorService multiActionThreadPool =
-    new ThreadPoolExecutor(1, Integer.MAX_VALUE,
+  static ExecutorService multiActionThreadPool = new ThreadPoolExecutor(1, Integer.MAX_VALUE,
       60, TimeUnit.SECONDS,
       new SynchronousQueue<Runnable>(),
       new DaemonThreadFactory("htable-thread-"));
+
+  static ListeningExecutorService listeningMultiActionPool = MoreExecutors.listeningDecorator(multiActionThreadPool);
   static {
     ((ThreadPoolExecutor)multiActionThreadPool).allowCoreThreadTimeOut(true);
   }
@@ -798,7 +801,7 @@ public class HTable implements HTableInt
       throws IOException {
     Result[] results = new Result[actions.size()];
     try {
-      this.getConnectionAndResetOperationContext().processBatchedGets(actions, tableName,
multiActionThreadPool,
+      this.getConnectionAndResetOperationContext().processBatchedGets(actions, tableName,
listeningMultiActionPool,
           results, this.options);
     } catch (Exception e) {
       e.printStackTrace();
@@ -815,7 +818,7 @@ public class HTable implements HTableInt
       throws IOException {
     try {
       this.getConnectionAndResetOperationContext().processBatchedMutations(actions,
-          tableName, multiActionThreadPool, null, this.options);
+          tableName, listeningMultiActionPool, null, this.options);
     } catch (Exception e) {
       throw new IOException(e);
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
Fri May 16 18:18:35 2014
@@ -45,12 +45,18 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -93,6 +99,8 @@ import org.apache.thrift.transport.TTran
 
 import com.google.common.base.Preconditions;
 
+import javax.annotation.Nullable;
+
 /* Encapsulates finding the servers for an HBase instance */
 public class TableServers implements ServerConnection {
   static final Log LOG = LogFactory.getLog(TableServers.class);
@@ -100,6 +108,7 @@ public class TableServers implements Ser
   private final int prefetchRegionLimit;
 
   private final Object masterLock = new Object();
+  private final int maxOutstandRequestsPerServer;
   private volatile boolean closed;
   private volatile HMasterInterface master;
   private volatile boolean masterChecked;
@@ -138,7 +147,9 @@ public class TableServers implements Ser
   private long fastFailClearingTimeMilliSec;
   private final boolean recordClientContext;
 
-  private ThreadLocal<List<OperationContext>> operationContextPerThread = new
ThreadLocal<List<OperationContext>>();
+  private ThreadLocal<List<OperationContext>> operationContextPerThread = new
ThreadLocal<>();
+
+  private final ConcurrentHashMap<HServerAddress, AtomicInteger> outstandingRequests
= new ConcurrentHashMap<>();
 
   @Override
   public void resetOperationContext() {
@@ -285,6 +296,7 @@ public class TableServers implements Ser
 
     this.recordClientContext = conf.getBoolean("hbase.client.record.context", false);
 
+    this.maxOutstandRequestsPerServer = conf.getInt("hbase.client.max.outstanding.requests.per.server",
50);
   }
 
   // Used by master and region servers during safe mode only
@@ -1895,23 +1907,30 @@ private HRegionLocation locateMetaInRoot
     return actionsByServer;
   }
 
-  private Map<HServerAddress, Future<MultiResponse>> makeServerRequests(
+  private Map<HServerAddress, ListenableFuture<MultiResponse>> makeServerRequests(
       Map<HServerAddress, MultiAction> actionsByServer,
-      final byte[] tableName, ExecutorService pool, HBaseRPCOptions options) {
+      final byte[] tableName, ListeningExecutorService pool, HBaseRPCOptions options) {
 
-    Map<HServerAddress, Future<MultiResponse>> futures = new HashMap<HServerAddress,
Future<MultiResponse>>(
+    Map<HServerAddress, ListenableFuture<MultiResponse>> futures = new HashMap<>(
         actionsByServer.size());
 
     boolean singleServer = (actionsByServer.size() == 1);
     for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
       Callable<MultiResponse> callable = createMultiActionCallable(
           e.getKey(), e.getValue(), tableName, options);
-      Future<MultiResponse> task;
-      if (singleServer) {
-        task = new FutureTask<MultiResponse>(callable);
-        ((FutureTask<MultiResponse>) task).run();
-      } else {
-        task = pool.submit(callable);
+      ListenableFuture<MultiResponse> task;
+
+      try {
+       validateAndIncrementNumOutstandingPerServer(e.getKey());
+        if (singleServer) {
+          task =  ListenableFutureTask.create(callable);
+          ((FutureTask<MultiResponse>) task).run();
+        } else {
+          task = pool.submit(callable);
+        }
+        Futures.addCallback(task, new OutstandingRequestCallback(e.getKey()));
+      } catch (IOException e1) {
+        task = Futures.immediateFailedFuture(e1);
       }
       futures.put(e.getKey(), task);
     }
@@ -1925,12 +1944,12 @@ private HRegionLocation locateMetaInRoot
   private List<Mutation> collectResponsesForMutateFromAllRS(
       StringBytes tableName,
       Map<HServerAddress, MultiAction> actionsByServer,
-      Map<HServerAddress, Future<MultiResponse>> futures,
+      Map<HServerAddress, ListenableFuture<MultiResponse>> futures,
       Map<String, HRegionFailureInfo> failureInfo)
       throws InterruptedException, IOException {
 
     List<Mutation> newWorkingList = null;
-    for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+    for (Entry<HServerAddress, ListenableFuture<MultiResponse>> responsePerServer
: futures
         .entrySet()) {
       HServerAddress address = responsePerServer.getKey();
       MultiAction request = actionsByServer.get(address);
@@ -1972,13 +1991,13 @@ private HRegionLocation locateMetaInRoot
    */
   private List<Get> collectResponsesForGetFromAllRS(StringBytes tableName,
       Map<HServerAddress, MultiAction> actionsByServer,
-      Map<HServerAddress, Future<MultiResponse>> futures,
+      Map<HServerAddress, ListenableFuture<MultiResponse>> futures,
       List<Get> orig_list, Result[] results,
       Map<String, HRegionFailureInfo> failureInfo) throws IOException,
       InterruptedException {
 
     List<Get> newWorkingList = null;
-    for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+    for (Entry<HServerAddress, ListenableFuture<MultiResponse>> responsePerServer
: futures
         .entrySet()) {
       HServerAddress address = responsePerServer.getKey();
       MultiAction request = actionsByServer.get(address);
@@ -2116,7 +2135,7 @@ private HRegionLocation locateMetaInRoot
 
   @Override
   public void processBatchedMutations(List<Mutation> orig_list,
-      StringBytes tableName, ExecutorService pool, List<Mutation> failures,
+      StringBytes tableName, ListeningExecutorService pool, List<Mutation> failures,
       HBaseRPCOptions options) throws IOException, InterruptedException {
 
     // Keep track of the most recent servers for any given item for better
@@ -2146,7 +2165,7 @@ private HRegionLocation locateMetaInRoot
           workingList, tableName, false);
 
       // step 2: make the requests
-      Map<HServerAddress, Future<MultiResponse>> futures = makeServerRequests(
+      Map<HServerAddress, ListenableFuture<MultiResponse>> futures = makeServerRequests(
           actionsByServer, tableName.getBytes(), pool, options);
 
       // step 3: collect the failures and successes and prepare for retry
@@ -2165,7 +2184,7 @@ private HRegionLocation locateMetaInRoot
 
   @Override
   public void processBatchedGets(List<Get> orig_list, StringBytes tableName,
-      ExecutorService pool, Result[] results, HBaseRPCOptions options)
+      ListeningExecutorService pool, Result[] results, HBaseRPCOptions options)
       throws IOException, InterruptedException {
 
     Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, HRegionFailureInfo>();
@@ -2197,7 +2216,7 @@ private HRegionLocation locateMetaInRoot
           workingList, tableName, true);
 
       // step 2: make the requests
-      Map<HServerAddress, Future<MultiResponse>> futures = makeServerRequests(
+      Map<HServerAddress, ListenableFuture<MultiResponse>> futures = makeServerRequests(
           actionsByServer, tableName.getBytes(), pool, options);
 
       // step 3: collect the failures and successes and prepare for retry
@@ -2520,21 +2539,28 @@ private HRegionLocation locateMetaInRoot
       Map<String, HRegionFailureInfo> failedRegionsInfo) throws IOException {
     List<Put> failed = null;
 
-    List<Future<MultiPutResponse>> futures = new ArrayList<Future<MultiPutResponse>>(
+    List<ListenableFuture<MultiPutResponse>> futures = new ArrayList<>(
         multiPuts.size());
     boolean singleServer = (multiPuts.size() == 1);
     for (MultiPut put : multiPuts) {
       Callable<MultiPutResponse> callable = createPutCallable(put.address,
           put, options);
-      Future<MultiPutResponse> task;
-      if (singleServer) {
-        FutureTask<MultiPutResponse> futureTask = new FutureTask<MultiPutResponse>(
-            callable);
-        task = futureTask;
-        futureTask.run();
-      } else {
-        task = HTable.multiActionThreadPool.submit(callable);
+      ListenableFuture<MultiPutResponse> task;
+      try {
+        validateAndIncrementNumOutstandingPerServer(put.address);
+        if (singleServer) {
+          ListenableFutureTask<MultiPutResponse> futureTask = ListenableFutureTask.create(
+              callable);
+          task = futureTask;
+          futureTask.run();
+        } else {
+          task = HTable.listeningMultiActionPool.submit(callable);
+        }
+        Futures.addCallback(task, new OutstandingRequestCallback(put.address));
+      } catch (IOException e1) {
+        task = Futures.immediateFailedFuture(e1);
       }
+
       futures.add(task);
     }
 
@@ -3329,8 +3355,49 @@ private HRegionLocation locateMetaInRoot
     return regionFlushTimesMap;
   }
 
+  void validateAndIncrementNumOutstandingPerServer(HServerAddress address)
+      throws TooManyOutstandingRequestsException {
+    AtomicInteger atomicOutstanding = outstandingRequests.computeIfAbsent(address, new Function<HServerAddress,
AtomicInteger>() {
+      @Override public AtomicInteger apply(HServerAddress address) {
+        return new AtomicInteger(0);
+      }
+    });
+
+    int outstanding = atomicOutstanding.get();
+    if (outstanding > maxOutstandRequestsPerServer) {
+      throw new TooManyOutstandingRequestsException(address, outstanding);
+    }
+    atomicOutstanding.incrementAndGet();
+  }
+
+  void decrementNumOutstandingPerServer(HServerAddress address) {
+    AtomicInteger outstanding = outstandingRequests.computeIfAbsent(address, new Function<HServerAddress,
AtomicInteger>() {
+      @Override public AtomicInteger apply(HServerAddress address) {
+        return new AtomicInteger(0);
+      }
+    });
+    outstanding.decrementAndGet();
+  }
+
   @Override
   public Configuration getConf() {
     return this.conf;
   }
+
+  private class OutstandingRequestCallback<V> implements FutureCallback<V> {
+
+    private final HServerAddress server;
+
+    public OutstandingRequestCallback(HServerAddress server) {
+      this.server = server;
+    }
+
+    @Override public void onSuccess(@Nullable V result) {
+      decrementNumOutstandingPerServer(server);
+    }
+
+    @Override public void onFailure(Throwable t) {
+      decrementNumOutstandingPerServer(server);
+    }
+  }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java?rev=1595280&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java
(added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TooManyOutstandingRequestsException.java
Fri May 16 18:18:35 2014
@@ -0,0 +1,37 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HServerAddress;
+
+
+public class TooManyOutstandingRequestsException extends ClientSideDoNotRetryException {
+  private final HServerAddress server;
+
+  public TooManyOutstandingRequestsException(HServerAddress server, int numCurrentlyOutstanding)
{
+    super("Server " + server + " has " + numCurrentlyOutstanding + " outstanding requests.
" +
+          "Failing fast to keep from creating more threads.");
+    this.server = server;
+  }
+
+  public HServerAddress getServer() {
+    return server;
+  }
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java?rev=1595280&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestLoadShedding.java
Fri May 16 18:18:35 2014
@@ -0,0 +1,160 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(MediumTests.class)
+public class TestLoadShedding {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final AtomicBoolean RUNNING = new AtomicBoolean(true);
+  private static final AtomicLong NUM_SHED = new AtomicLong(0);
+  private static final String TABLE_NAME = "testLoadShedding";
+  private static final String FAMILY_NAME_STR = "d";
+  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+  public static final int MAX_WRITER_THREADS = 15;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt("hbase.client.max.outstanding.requests.per.server", 5);
+    TEST_UTIL.startMiniCluster();
+    TEST_UTIL.waitForTableConsistent();
+    HBaseAdmin hba = TEST_UTIL.getHBaseAdmin();
+    HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
+    htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
+    hba.createTable(htd, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 10);
+    RUNNING.set(true);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    RUNNING.set(false);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testLoadSheddingWrites() {
+
+    // Start lots of threads so that there will be more than 2 contending per
+    // server
+    List<Thread> threads = new ArrayList<>(MAX_WRITER_THREADS);
+    for (int i = 0; i < MAX_WRITER_THREADS; i++) {
+      Thread t = new WriterThread(i);
+      t.setDaemon(true);
+      t.start();
+      threads.add(t);
+    }
+
+    try {
+      Thread.sleep(5000);
+    } catch (InterruptedException e) {
+    }
+
+    RUNNING.set(false);
+    assertTrue(NUM_SHED.get() > 0);
+
+    // Now wait for everything to stop
+    for (Thread t:threads) {
+      try {
+        t.join();
+        t.stop();
+      } catch (InterruptedException e) {
+
+      }
+    }
+
+    // Clean up
+    NUM_SHED.set(0);
+    RUNNING.set(true);
+
+    // Make sure that just a few threads don't contend
+    for (int x=0; x< 4; x++) {
+      Thread t = new WriterThread(x + MAX_WRITER_THREADS);
+      t.setDaemon(true);
+      t.start();
+    }
+
+    try {
+      Thread.sleep(5000);
+    } catch (InterruptedException e) {
+    }
+
+    RUNNING.set(false);
+    assertEquals(0, NUM_SHED.get());
+
+
+  }
+
+  protected static Put generateRandomPut() {
+    Put p = new Put(Bytes.toBytes(RandomStringUtils.randomAlphabetic(30)));
+    p.add(FAMILY_NAME, Bytes.toBytes(RandomUtils.nextInt()), Bytes.toBytes(
+        RandomStringUtils.randomAlphanumeric(10)));
+    return p;
+  }
+
+  private static class WriterThread extends Thread {
+    public WriterThread(int i) {
+      super("client-writer-" + i);
+    }
+
+    public void run() {
+      try {
+        HTable ht = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
+        ht.setAutoFlush(false);
+
+        while(RUNNING.get()) {
+          ht.put(generateRandomPut());
+          ht.put(generateRandomPut());
+          ht.put(generateRandomPut());
+          ht.put(generateRandomPut());
+          ht.put(generateRandomPut());
+          ht.put(generateRandomPut());
+          ht.put(generateRandomPut());
+          ht.flushCommits();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        NUM_SHED.incrementAndGet();
+      }
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java?rev=1595280&r1=1595279&r2=1595280&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mock/HConnectionMockImpl.java
Fri May 16 18:18:35 2014
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -227,7 +229,7 @@ public class HConnectionMockImpl impleme
 
   @Override
   public void processBatchedGets(List<Get> actions, StringBytes tableName,
-      ExecutorService pool, Result[] results, HBaseRPCOptions options)
+      ListeningExecutorService pool, Result[] results, HBaseRPCOptions options)
       throws IOException, InterruptedException {
     // TODO Auto-generated method stub
 
@@ -235,7 +237,7 @@ public class HConnectionMockImpl impleme
 
   @Override
   public void processBatchedMutations(List<Mutation> actions,
-      StringBytes tableName, ExecutorService pool, List<Mutation> failures,
+      StringBytes tableName, ListeningExecutorService pool, List<Mutation> failures,
       HBaseRPCOptions options) throws IOException, InterruptedException {
     // TODO Auto-generated method stub
 



Mime
View raw message