hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject git commit: HBASE-12104 Some optimization and bugfix for HTableMultiplexer (Yi Deng)
Date Sat, 04 Oct 2014 00:24:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 12fd6d2a2 -> 073ca6a51


HBASE-12104 Some optimization and bugfix for HTableMultiplexer (Yi Deng)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/073ca6a5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/073ca6a5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/073ca6a5

Branch: refs/heads/branch-1
Commit: 073ca6a5160d2343699a93d0b35c7a5d1e5e2a47
Parents: 12fd6d2
Author: stack <stack@apache.org>
Authored: Fri Oct 3 17:17:09 2014 -0700
Committer: stack <stack@apache.org>
Committed: Fri Oct 3 17:24:47 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/HTableMultiplexer.java  | 407 ++++++++++---------
 1 file changed, 208 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/073ca6a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index e8c6909..fae0a94 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -28,23 +28,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
  * Each put will be sharded into different buffer queues based on its destination region
server.
@@ -63,24 +68,25 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 @InterfaceStability.Evolving
 public class HTableMultiplexer {
   private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
-  private static int poolID = 0;
   
-  static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
-
-  /** The map between each region server to its corresponding buffer queue */
-  private final Map<HRegionLocation, LinkedBlockingQueue<PutStatus>> serverToBufferQueueMap
=
-      new ConcurrentHashMap<>();
+  public static final String TABLE_MULTIPLEXER_FLUSH_PERIOD_MS =
+      "hbase.tablemultiplexer.flush.period.ms";
+  public static final String TABLE_MULTIPLEXER_INIT_THREADS = "hbase.tablemultiplexer.init.threads";
+  public static final String TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE =
+      "hbase.client.max.retries.in.queue";
 
   /** The map between each region server to its flush worker */
-  private final Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap =
+  private final Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap =
       new ConcurrentHashMap<>();
 
   private final Configuration conf;
   private final ClusterConnection conn;
   private final ExecutorService pool;
   private final int retryNum;
-  private int perRegionServerBufferQueueSize;
+  private final int perRegionServerBufferQueueSize;
   private final int maxKeyValueSize;
+  private final ScheduledExecutorService executor;
+  private final long flushPeriod;
   
   /**
    * @param conf The HBaseConfiguration
@@ -96,6 +102,11 @@ public class HTableMultiplexer {
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
     this.maxKeyValueSize = HTable.getMaxKeyValueSize(conf);
+    this.flushPeriod = conf.getLong(TABLE_MULTIPLEXER_FLUSH_PERIOD_MS, 100);
+    int initThreads = conf.getInt(TABLE_MULTIPLEXER_INIT_THREADS, 10);
+    this.executor =
+        Executors.newScheduledThreadPool(initThreads,
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HTableFlushWorker-%d").build());
   }
 
   /**
@@ -106,7 +117,7 @@ public class HTableMultiplexer {
    * @return true if the request can be accepted by its corresponding buffer queue.
    * @throws IOException
    */
-  public boolean put(TableName tableName, final Put put) throws IOException {
+  public boolean put(TableName tableName, final Put put) {
     return put(tableName, put, this.retryNum);
   }
 
@@ -118,8 +129,7 @@ public class HTableMultiplexer {
    * @return the list of puts which could not be queued
    * @throws IOException
    */
-  public List<Put> put(TableName tableName, final List<Put> puts)
-      throws IOException {
+  public List<Put> put(TableName tableName, final List<Put> puts) {
     if (puts == null)
       return null;
     
@@ -140,23 +150,22 @@ public class HTableMultiplexer {
     return failedPuts;
   }
 
-  public List<Put> put(byte[] tableName, final List<Put> puts) throws IOException
{
+  /**
+   * Deprecated. Use {@link #put(TableName, List) } instead.
+   */
+  @Deprecated
+  public List<Put> put(byte[] tableName, final List<Put> puts) {
     return put(TableName.valueOf(tableName), puts);
   }
-
-
+  
   /**
    * The put request will be buffered by its corresponding buffer queue. And the put request
will be
    * retried before dropping the request.
    * Return false if the queue is already full.
-   * @param tableName
-   * @param put
-   * @param retry
    * @return true if the request can be accepted by its corresponding buffer queue.
    * @throws IOException
    */
-  public boolean put(final TableName tableName, final Put put, int retry)
-      throws IOException {
+  public boolean put(final TableName tableName, final Put put, int retry) {
     if (retry <= 0) {
       return false;
     }
@@ -166,25 +175,36 @@ public class HTableMultiplexer {
       HRegionLocation loc = conn.getRegionLocation(tableName, put.getRow(), false);
       if (loc != null) {
         // Add the put pair into its corresponding queue.
-
         LinkedBlockingQueue<PutStatus> queue = getQueue(loc);
+
         // Generate a MultiPutStatus object and offer it into the queue
         PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
         
         return queue.offer(s);
       }
-    } catch (Exception e) {
-      LOG.debug("Cannot process the put " + put + " because of " + e);
+    } catch (IOException e) {
+      LOG.debug("Cannot process the put " + put, e);
     }
     return false;
   }
 
-  public boolean put(final byte[] tableName, final Put put, int retry)
-      throws IOException {
+  /**
+   * Deprecated. Use {@link #put(TableName, Put) } instead.
+   */
+  @Deprecated
+  public boolean put(final byte[] tableName, final Put put, int retry) {
     return put(TableName.valueOf(tableName), put, retry);
   }
 
   /**
+   * Deprecated. Use {@link #put(TableName, Put)} instead.
+   */
+  @Deprecated
+  public boolean put(final byte[] tableName, Put put) {
+    return put(TableName.valueOf(tableName), put);
+  }
+  
+  /**
    * @return the current HTableMultiplexerStatus
    */
   public HTableMultiplexerStatus getHTableMultiplexerStatus() {
@@ -192,30 +212,20 @@ public class HTableMultiplexer {
   }
 
   private LinkedBlockingQueue<PutStatus> getQueue(HRegionLocation addr) {
-    LinkedBlockingQueue<PutStatus> queue = serverToBufferQueueMap.get(addr);
-    if (queue == null) {
-      synchronized (this.serverToBufferQueueMap) {
-        queue = serverToBufferQueueMap.get(addr);
-        if (queue == null) {
-          // Create a queue for the new region server
-          queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
-          serverToBufferQueueMap.put(addr, queue);
-
+    FlushWorker worker = serverToFlushWorkerMap.get(addr);
+    if (worker == null) {
+      synchronized (this.serverToFlushWorkerMap) {
+        worker = serverToFlushWorkerMap.get(addr);
+        if (worker == null) {
           // Create the flush worker
-          HTableFlushWorker worker =
-              new HTableFlushWorker(conf, this.conn, addr, this, queue, pool);
+          worker = new FlushWorker(conf, this.conn, addr, this, perRegionServerBufferQueueSize,
+                  pool, executor);
           this.serverToFlushWorkerMap.put(addr, worker);
-
-          // Launch a daemon thread to flush the puts
-          // from the queue to its corresponding region server.
-          String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + (poolID++);
-          Thread t = new Thread(worker, name);
-          t.setDaemon(true);
-          t.start();
+          executor.scheduleAtFixedRate(worker, flushPeriod, flushPeriod, TimeUnit.MILLISECONDS);
         }
       }
     }
-    return queue;
+    return worker.getQueue();
   }
 
   /**
@@ -223,7 +233,7 @@ public class HTableMultiplexer {
    * report the number of buffered requests and the number of the failed (dropped) requests
    * in total or on per region server basis.
    */
-  static class HTableMultiplexerStatus {
+  public static class HTableMultiplexerStatus {
     private long totalFailedPutCounter;
     private long totalBufferedPutCounter;
     private long maxLatency;
@@ -234,7 +244,7 @@ public class HTableMultiplexer {
     private Map<String, Long> serverToMaxLatencyMap;
 
     public HTableMultiplexerStatus(
-        Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
       this.totalBufferedPutCounter = 0;
       this.totalFailedPutCounter = 0;
       this.maxLatency = 0;
@@ -247,17 +257,17 @@ public class HTableMultiplexer {
     }
 
     private void initialize(
-        Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+        Map<HRegionLocation, FlushWorker> serverToFlushWorkerMap) {
       if (serverToFlushWorkerMap == null) {
         return;
       }
 
       long averageCalcSum = 0;
       int averageCalcCount = 0;
-      for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : serverToFlushWorkerMap
+      for (Map.Entry<HRegionLocation, FlushWorker> entry : serverToFlushWorkerMap
           .entrySet()) {
         HRegionLocation addr = entry.getKey();
-        HTableFlushWorker worker = entry.getValue();
+        FlushWorker worker = entry.getValue();
 
         long bufferedCounter = worker.getTotalBufferedCount();
         long failedCounter = worker.getTotalFailedCount();
@@ -325,25 +335,15 @@ public class HTableMultiplexer {
   }
   
   private static class PutStatus {
-    private final HRegionInfo regionInfo;
-    private final Put put;
-    private final int retryCount;
-    public PutStatus(final HRegionInfo regionInfo, final Put put,
-        final int retryCount) {
+    public final HRegionInfo regionInfo;
+    public final Put put;
+    public final int retryCount;
+
+    public PutStatus(HRegionInfo regionInfo, Put put, int retryCount) {
       this.regionInfo = regionInfo;
       this.put = put;
       this.retryCount = retryCount;
     }
-
-    public HRegionInfo getRegionInfo() {
-      return regionInfo;
-    }
-    public Put getPut() {
-      return put;
-    }
-    public int getRetryCount() {
-      return retryCount;
-    }
   }
 
   /**
@@ -386,26 +386,38 @@ public class HTableMultiplexer {
     }
   }
 
-  private static class HTableFlushWorker implements Runnable {
+  private static class FlushWorker implements Runnable {
     private final HRegionLocation addr;
-    private final Configuration conf;
-    private final ClusterConnection conn;
+    private final AsyncProcess asyncProc;
     private final LinkedBlockingQueue<PutStatus> queue;
-    private final HTableMultiplexer htableMultiplexer;
+    private final HTableMultiplexer multiplexer;
     private final AtomicLong totalFailedPutCount = new AtomicLong(0);
-    private final AtomicInteger currentProcessingPutCount = new AtomicInteger(0);
+    private final AtomicInteger currentProcessingCount = new AtomicInteger(0);
     private final AtomicAverageCounter averageLatency = new AtomicAverageCounter();
     private final AtomicLong maxLatency = new AtomicLong(0);
     private final ExecutorService pool;
+    private final List<PutStatus> processingList = new ArrayList<>();
+    private final ScheduledExecutorService executor;
+    private final int maxRetryInQueue;
+    private final AtomicInteger retryInQueue = new AtomicInteger(0);
+    private final int rpcTimeOutMs;
     
-    public HTableFlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation
addr,
-        HTableMultiplexer htableMultiplexer, LinkedBlockingQueue<PutStatus> queue,
ExecutorService pool) {
+    public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
+        HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
+        ExecutorService pool, ScheduledExecutorService executor) {
       this.addr = addr;
-      this.conf = conf;
-      this.conn = conn;
-      this.htableMultiplexer = htableMultiplexer;
-      this.queue = queue;
+      this.asyncProc = conn.getAsyncProcess();
+      this.multiplexer = htableMultiplexer;
+      this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
       this.pool = pool;
+      this.executor = executor;
+      this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
+      this.rpcTimeOutMs =
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    }
+
+    protected LinkedBlockingQueue<PutStatus> getQueue() {
+      return this.queue;
     }
 
     public long getTotalFailedCount() {
@@ -413,7 +425,7 @@ public class HTableMultiplexer {
     }
 
     public long getTotalBufferedCount() {
-      return queue.size() + currentProcessingPutCount.get();
+      return queue.size() + currentProcessingCount.get();
     }
 
     public AtomicAverageCounter getAverageLatencyCounter() {
@@ -424,149 +436,146 @@ public class HTableMultiplexer {
       return this.maxLatency.getAndSet(0);
     }
 
-    private boolean resubmitFailedPut(PutStatus failedPutStatus,
-        HRegionLocation oldLoc) throws IOException {
-      Put failedPut = failedPutStatus.getPut();
-      // The currentPut is failed. So get the table name for the currentPut.
-      TableName tableName = failedPutStatus.getRegionInfo().getTable();
+    private boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOException
{
       // Decrease the retry count
-      int retryCount = failedPutStatus.getRetryCount() - 1;
+      final int retryCount = ps.retryCount - 1;
       
       if (retryCount <= 0) {
         // Update the failed counter and no retry any more.
         return false;
-      } else {
-        // Retry one more time
-        return this.htableMultiplexer.put(tableName, failedPut, retryCount);
       }
+
+      int cnt = retryInQueue.incrementAndGet();
+      if (cnt > maxRetryInQueue) {
+        // Too many Puts in queue for resubmit, give up this
+        retryInQueue.decrementAndGet();
+        return false;
+      }
+
+      final Put failedPut = ps.put;
+      // The currentPut is failed. So get the table name for the currentPut.
+      final TableName tableName = ps.regionInfo.getTable();
+
+      // Wait at least RPC timeout time
+      long delayMs = rpcTimeOutMs;
+      delayMs = Math.max(delayMs, (long) (multiplexer.flushPeriod * Math.pow(2,
+              multiplexer.retryNum - retryCount)));
+
+      executor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          boolean succ = false;
+          try {
+            succ = FlushWorker.this.multiplexer.put(tableName, failedPut, retryCount);
+          } finally {
+            FlushWorker.this.retryInQueue.decrementAndGet();
+            if (!succ) {
+              FlushWorker.this.totalFailedPutCount.incrementAndGet();
+            }
+          }
+        }
+      }, delayMs, TimeUnit.MILLISECONDS);
+      return true;
     }
 
     @Override
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings
-        (value = "REC_CATCH_EXCEPTION", justification = "na")
     public void run() {
-      List<PutStatus> processingList = new ArrayList<>();
-      /** 
-       * The frequency in milliseconds for the current thread to process the corresponding
 
-       * buffer queue.  
-       **/
-      long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100);
-
-      // initial delay
+      int failedCount = 0;
       try {
-        Thread.sleep(frequency);
-      } catch (InterruptedException e) {
-        LOG.warn("Interrupted while sleeping");
-        Thread.currentThread().interrupt();
-      }
+        long start = EnvironmentEdgeManager.currentTime();
+
+        // drain all the queued puts into the tmp list
+        processingList.clear();
+        queue.drainTo(processingList);
+        if (processingList.size() == 0) {
+          // Nothing to flush
+          return;
+        }
 
-      AsyncProcess ap = conn.getAsyncProcess();
+        currentProcessingCount.set(processingList.size());
+        // failedCount is decreased whenever a Put is success or resubmit.
+        failedCount = processingList.size();
+
+        List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
+        MultiAction<Row> actions = new MultiAction<>();
+        for (int i = 0; i < processingList.size(); i++) {
+          PutStatus putStatus = processingList.get(i);
+          Action<Row> action = new Action<Row>(putStatus.put, i);
+          actions.add(putStatus.regionInfo.getRegionName(), action);
+          retainedActions.add(action);
+        }
 
-      long start, elapsed;
-      int failedCount = 0;
-      while (true) {
+        // Process this multi-put request
+        List<PutStatus> failed = null;
+        Object[] results = new Object[actions.size()];
+        ServerName server = addr.getServerName();
+        Map<ServerName, MultiAction<Row>> actionsByServer =
+            Collections.singletonMap(server, actions);
         try {
-          start = elapsed = EnvironmentEdgeManager.currentTime();
-
-          // Clear the processingList, putToStatusMap and failedCount
-          processingList.clear();
-          failedCount = 0;
-
-          // drain all the queued puts into the tmp list
-          queue.drainTo(processingList);
-          currentProcessingPutCount.set(processingList.size());
-
-          if (processingList.size() > 0) {
-            List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
-            MultiAction<Row> actions = new MultiAction<>();
-            for (int i = 0; i < processingList.size(); i++) {
-              PutStatus putStatus = processingList.get(i);
-              Action<Row> action = new Action<Row>(putStatus.getPut(), i);
-              actions.add(putStatus.getRegionInfo().getRegionName(), action);
-              retainedActions.add(action);
-            }
-            
-            // Process this multi-put request
-            List<PutStatus> failed = null;
-            Object[] results = new Object[actions.size()];
-            ServerName server = addr.getServerName();
-            Map<ServerName, MultiAction<Row>> actionsByServer =
-                Collections.singletonMap(server, actions);
-            try {
-              AsyncRequestFuture arf =
-                  ap.submitMultiActions(null, retainedActions, 0L, null, results,
-                    true, null, null, actionsByServer, pool);
-              arf.waitUntilDone();
-              if (arf.hasError()) {
-                throw arf.getErrors();
-              }
-            } catch (IOException e) {
-              LOG.debug("Caught some exceptions " + e
-                  + " when flushing puts to region server " + addr.getHostnamePort());
-            } finally {
-              // mutate list so that it is empty for complete success, or
-              // contains only failed records
-              // results are returned in the same order as the requests in list
-              // walk the list backwards, so we can remove from list without
-              // impacting the indexes of earlier members
-              for (int i = 0; i < results.length; i++) {
-                if (results[i] == null) {
-                  if (failed == null) {
-                    failed = new ArrayList<PutStatus>();
-                  }
-                  failed.add(processingList.get(i));
-                }
-              }
-            }
-
-            if (failed != null) {
-              // Resubmit failed puts
-              for (PutStatus putStatus : processingList) {
-                if (!resubmitFailedPut(putStatus, this.addr)) {
-                  failedCount++;
-                }
+          AsyncRequestFuture arf =
+              asyncProc.submitMultiActions(null, retainedActions, 0L, null, results, true,
null,
+                null, actionsByServer, pool);
+          arf.waitUntilDone();
+          if (arf.hasError()) {
+            // We just log and ignore the exception here since failed Puts will be resubmit
again.
+            LOG.debug("Caught some exceptions when flushing puts to region server "
+                + addr.getHostnamePort(), arf.getErrors());
+          }
+        } finally {
+          for (int i = 0; i < results.length; i++) {
+            if (results[i] == null) {
+              if (failed == null) {
+                failed = new ArrayList<PutStatus>();
               }
-              // Update the totalFailedCount
-              this.totalFailedPutCount.addAndGet(failedCount);
-            }
-            
-            elapsed = EnvironmentEdgeManager.currentTime() - start;
-            // Update latency counters
-            averageLatency.add(elapsed);
-            if (elapsed > maxLatency.get()) {
-              maxLatency.set(elapsed);
+              failed.add(processingList.get(i));
+            } else {
+              failedCount--;
             }
-            
-            // Log some basic info
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Processed " + currentProcessingPutCount
-                  + " put requests for " + addr.getHostnamePort() + " and "
-                  + failedCount + " failed" + ", latency for this send: "
-                  + elapsed);
-            }
-
-            // Reset the current processing put count
-            currentProcessingPutCount.set(0);
           }
+        }
 
-          // Sleep for a while
-          if (elapsed == start) {
-            elapsed = EnvironmentEdgeManager.currentTime() - start;
-          }
-          if (elapsed < frequency) {
-            try {
-              Thread.sleep(frequency - elapsed);
-            } catch (InterruptedException e) {
-              LOG.warn("Interrupted while sleeping");
-              Thread.currentThread().interrupt();
+        if (failed != null) {
+          // Resubmit failed puts
+          for (PutStatus putStatus : processingList) {
+            if (resubmitFailedPut(putStatus, this.addr)) {
+              failedCount--;
             }
           }
-        } catch (Exception e) {
-          // Log all the exceptions and move on
-          LOG.debug("Caught some exceptions " + e
-              + " when flushing puts to region server "
-                + addr.getHostnamePort(), e);
         }
+
+        long elapsed = EnvironmentEdgeManager.currentTime() - start;
+        // Update latency counters
+        averageLatency.add(elapsed);
+        if (elapsed > maxLatency.get()) {
+          maxLatency.set(elapsed);
+        }
+
+        // Log some basic info
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Processed " + currentProcessingCount + " put requests for "
+              + addr.getHostnamePort() + " and " + failedCount + " failed"
+              + ", latency for this send: " + elapsed);
+        }
+
+        // Reset the current processing put count
+        currentProcessingCount.set(0);
+      } catch (RuntimeException e) {
+        // To make findbugs happy
+        // Log all the exceptions and move on
+        LOG.debug(
+          "Caught some exceptions " + e + " when flushing puts to region server "
+              + addr.getHostnamePort(), e);
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        // Log all the exceptions and move on
+        LOG.debug(
+          "Caught some exceptions " + e + " when flushing puts to region server "
+              + addr.getHostnamePort(), e);
+      } finally {
+        // Update the totalFailedCount
+        this.totalFailedPutCount.addAndGet(failedCount);
       }
     }
   }


Mime
View raw message