hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1340448 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hb...
Date Sat, 19 May 2012 12:50:42 GMT
Author: mbautin
Date: Sat May 19 12:50:41 2012
New Revision: 1340448

URL: http://svn.apache.org/viewvc?rev=1340448&view=rev
Log:
[jira][89-fb][HBASE-5776] HTableMultiplexer

Author: liyintang

Summary:
There is a known issue in HBase client that single slow/dead region
server could slow down the multiput operations across all the region
servers. So the HBase client will be as slow as the slowest region
server in the cluster.

To solve this problem,  HTableMultiplexer provides a thread-safe non
blocking PUT API across all the tables.
Each put will be sharded in to different buffer queues based on its
destination region server. So each queue will only have the puts shared
the same destination. And each queue will have a flush worker thread to
flush the puts request to the region server.

All the puts will be retried as a configuration number before dropping.
And the HTableMultiplexer can report the number of buffered requests and
the number of the failed (dropped) requests in total or on per region
server basis.

Test Plan: Unit tests

Reviewers: kannan

Reviewed By: kannan

CC: JIRA, tedyu

Differential Revision: https://reviews.facebook.net/D2775

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1340448&r1=1340447&r2=1340448&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat May 19
12:50:41 2012
@@ -548,6 +548,7 @@ public final class HConstants {
    */
   public static final byte [] NO_NEXT_INDEXED_KEY = Bytes.toBytes("NO_NEXT_INDEXED_KEY");
 
+  public static final int MULTIPUT_SUCCESS = -1;
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1340448&r1=1340447&r2=1340448&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Sat
May 19 12:50:41 2012
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -241,6 +240,26 @@ public interface HConnection extends Clo
     throws IOException;
 
   /**
+   * Process the MultiPut request by submitting it to the multiPutThreadPool in HTable.
+   * The request will be sent to its destination region server by one thread in 
+   * the HTable's multiPutThreadPool.
+   * 
+   * Also it will return the list of failed put among the MultiPut request or return null
if all
+   * puts are sent to the HRegionServer successfully.
+   * @param mput The MultiPut request
+   * @return the list of failed put among the MultiPut request, otherwise return null 
+   *         if all puts are sent to the HRegionServer successfully.
+   */
+  public List<Put> processSingleMultiPut(MultiPut mput);
+  
+  /**
+   * Delete the cached location
+   * @param tableName
+   * @param row
+   */
+  public void deleteCachedLocation(final byte [] tableName, final byte [] row);
+  
+  /**
    * Enable or disable region cache prefetch for the table. It will be
    * applied for the given table's all HTable instances within this
    * connection. By default, the cache prefetch is enabled.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1340448&r1=1340447&r2=1340448&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Sat May 19 12:50:41 2012
@@ -1029,8 +1029,7 @@ public class HConnectionManager {
      * Delete a cached location, if it satisfies the table name and row
      * requirements.
      */
-    void deleteCachedLocation(final byte [] tableName,
-                                      final byte [] row) {
+    public void deleteCachedLocation(final byte [] tableName, final byte [] row) {
       synchronized (this.cachedRegionLocations) {
         SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
             getTableLocations(tableName);
@@ -1724,6 +1723,44 @@ public class HConnectionManager {
       }
     }
 
+    /** {@inheritDoc} */
+    public List<Put> processSingleMultiPut(MultiPut mput) {
+      if (mput == null || mput.address == null)
+        return null;
+      List<Put> failedPuts = null;
+
+      Future<MultiPutResponse> future = HTable.multiPutThreadPool.submit(
+          createPutCallable(mput.address, mput, null));
+
+      try {
+        MultiPutResponse resp = future.get();
+        // Process the response for each region
+        for (Map.Entry<byte[], List<Put>> e : mput.puts.entrySet()) {
+          Integer result = resp.getAnswer(e.getKey());
+          if (result == null) {
+            // Prepare all the failed puts for retry
+            if (failedPuts == null) {
+              failedPuts = new ArrayList<Put>();
+            }
+            failedPuts.addAll(e.getValue());
+
+          } else if (result >= 0) {
+            // Prepared the failed puts for retry
+            if (failedPuts == null) {
+              failedPuts = new ArrayList<Put>();
+            }
+            List<Put> lst = e.getValue();
+            failedPuts.addAll(lst.subList(result, lst.size()));
+          } 
+        }
+      } catch (Exception e) {
+        LOG.error("Failed all puts request to " + mput.address.getHostNameWithPort() +
+            " because of " + e);
+        return (List<Put>) mput.allPuts();
+      }
+      return failedPuts;
+    }
+
     /**
      * If there are multiple puts in the list, process them from a
      * a thread pool, which is shared across all the HTable instance.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1340448&r1=1340447&r2=1340448&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat May
19 12:50:41 2012
@@ -733,7 +733,7 @@ public class HTable implements HTableInt
   }
 
   // validate for well-formedness
-  private void validatePut(final Put put) throws IllegalArgumentException{
+  public void validatePut(final Put put) throws IllegalArgumentException{
     if (put.isEmpty()) {
       throw new IllegalArgumentException("No columns to insert");
     }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1340448&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
(added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
Sat May 19 12:50:41 2012
@@ -0,0 +1,437 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+
+/**
+ * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
+ * Each put will be sharded into different buffer queues based on its destination region
server.
+ * So each region server buffer queue will only have the puts which share the same destination.
+ * And each queue will have a flush worker thread to flush the puts request to the region
server.
+ * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that 
+ * particular queue.
+ * 
+ * Also all the puts will be retried as a configuration number before dropping.
+ * And the HTableMultiplexer can report the number of buffered requests and the number of
the
+ * failed (dropped) requests in total or on per region server basis.
+ * 
+ * This class is thread safe.
+ */
+public class HTableMultiplexer {
+  private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
+  private static int poolID = 0;
+
+  private Map<byte[], HTable> tableNameToHTableMap;
+
+  /** The map between each region server to its corresponding buffer queue */
+  private Map<HServerAddress, LinkedBlockingQueue<PutStatus>>
+    serverToBufferQueueMap;
+
+  /** The map between each region server to its flush worker */
+  private Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap;
+
+  private Configuration conf;
+  private HConnection connection;
+  private int retryNum;
+  private int perRegionServerBufferQueueSize;
+  
+  /**
+   * 
+   * @param conf The HBaseConfiguration
+   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put
ops 
+   *         for each region server before dropping the request.
+   */
+  public HTableMultiplexer(Configuration conf, int perRegionServerBufferQueueSize) {
+    this.conf = conf;
+    this.connection = HConnectionManager.getConnection(conf);
+    this.serverToBufferQueueMap = new ConcurrentHashMap<HServerAddress,
+      LinkedBlockingQueue<PutStatus>>();
+    this.serverToFlushWorkerMap = new ConcurrentHashMap<HServerAddress, HTableFlushWorker>();
+    this.tableNameToHTableMap = new ConcurrentHashMap<byte[], HTable>();
+    this.retryNum = conf.getInt("hbase.client.retries.number", 10);
+    this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
+  }
+
+  /**
+   * The put request will be buffered by its corresponding buffer queue. Return false if
the queue
+   * is already full.
+   * @param table
+   * @param put
+   * @return true if the request can be accepted by its corresponding buffer queue.
+   * @throws IOException
+   */
+  public boolean put(final byte[] table, final Put put) throws IOException {
+    return put(table, put, this.retryNum);
+  }
+
+  /**
+   * The puts request will be buffered by their corresponding buffer queue. 
+   * Return the list of puts which could not be queued.
+   * @param table
+   * @param put
+   * @return the list of puts which could not be queued
+   * @throws IOException
+   */
+  public List<Put> put(final byte[] table, final List<Put> puts) throws IOException
{
+    if (puts == null)
+      return null;
+    
+    List <Put> failedPuts = null;
+    boolean result;
+    for (Put put : puts) {
+      result = put(table, put, this.retryNum);
+      if (result == false) {
+        
+        // Create the failed puts list if necessary
+        if (failedPuts == null) {
+          failedPuts = new ArrayList<Put>();
+        }
+        // Add the put to the failed puts list
+        failedPuts.add(put);
+      }
+    }
+    return failedPuts;
+  }
+
+  /**
+   * The put request will be buffered by its corresponding buffer queue. And the put request
will be
+   * retried before dropping the request.
+   * Return false if the queue is already full.
+   * @param table
+   * @param put
+   * @param retry
+   * @return true if the request can be accepted by its corresponding buffer queue.
+   * @throws IOException
+   */
+  public boolean put(final byte[] table, final Put put, int retry) throws IOException {
+    if (retry <= 0) {
+      return false;
+    }
+
+    LinkedBlockingQueue<PutStatus> queue;
+    HTable htable = getHTable(table);
+    try {
+      htable.validatePut(put);
+      HRegionLocation loc = htable.getRegionLocation(put.getRow());
+      if (loc != null) {
+        // Get the server location for the put
+        HServerAddress addr = loc.getServerAddress();
+        // Add the put pair into its corresponding queue.
+        queue = getBufferedQueue(addr);
+        // Generate a MultiPutStatus obj and offer it into the queue
+        PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
+        
+        return queue.offer(s);
+      }
+    } catch (Exception e) {
+      LOG.debug("Cannot process the put " + put + " because of " + e);
+    }
+    return false;
+  }
+
+  /**
+   * @return the current HTableMultiplexerStatus
+   */
+  public HTableMultiplexerStatus getHTableMultiplexerStatus() {
+    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
+  }
+
+
+  private HTable getHTable(final byte[] table) throws IOException {
+    HTable htable = this.tableNameToHTableMap.get(table);
+    if (htable == null) {
+      synchronized (this.tableNameToHTableMap) {
+        htable = this.tableNameToHTableMap.get(table);
+        if (htable == null)  {
+          htable = new HTable(conf, table);
+          this.tableNameToHTableMap.put(table, htable);
+        }
+      }
+    }
+    return htable;
+  }
+
+  private LinkedBlockingQueue<PutStatus> getBufferedQueue(
+      HServerAddress addr) {
+    LinkedBlockingQueue<PutStatus> queue;
+    // Add the put pair into its corresponding queue.
+    queue = serverToBufferQueueMap.get(addr);
+    if (queue == null) {
+      // Create the queue for the new region server
+      queue = addNewRegionServer(addr);
+    }
+    return queue;
+  }
+
+  private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(HServerAddress
addr) {
+    LinkedBlockingQueue<PutStatus> queue =
+      serverToBufferQueueMap.get(addr);
+    if (queue == null) {
+      // Create a queue for the new region server
+      queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
+      serverToBufferQueueMap.put(addr, queue);
+
+      // Create the flush worker
+      HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
+          this.connection, this, queue);
+      this.serverToFlushWorkerMap.put(addr, worker);
+
+      // Launch a daemon thread to flush the puts
+      // from the queue to its corresponding region server.
+      String name = "HTableFlushWorker-" + addr.getHostNameWithPort() + "-"
+          + (poolID++);
+      Thread t = new Thread(worker, name);
+      t.setDaemon(true);
+      t.start();
+    }
+    return queue;
+  }
+
+  /**
+   * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
+   * report the number of buffered requests and the number of the failed (dropped) requests
+   * in total or on per region server basis.
+   */
+  public static class HTableMultiplexerStatus {
+    private long totalFailedPutCounter;
+    private long totalBufferedPutCounter;
+    private Map<String, Long> serverToFailedCounterMap;
+    private Map<String, Long> serverToBufferedCounterMap;
+
+    public HTableMultiplexerStatus(Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap)
{
+      this.totalBufferedPutCounter = 0;
+      this.totalFailedPutCounter = 0;
+      this.serverToBufferedCounterMap = new HashMap<String, Long>();
+      this.serverToFailedCounterMap = new HashMap<String, Long>();
+      this.initialize(serverToFlushWorkerMap);
+    }
+
+    private void initialize(Map<HServerAddress, HTableFlushWorker> serverToFlushWorkerMap)
{
+      if (serverToFlushWorkerMap == null) {
+        return;
+      }
+
+      for (Map.Entry<HServerAddress, HTableFlushWorker> entry : serverToFlushWorkerMap
+          .entrySet()) {
+        HServerAddress addr = entry.getKey();
+        HTableFlushWorker worker = entry.getValue();
+
+        long bufferedCounter = worker.getTotalBufferedCount();
+        long failedCounter = worker.getTotalFailedCount();
+
+        this.totalBufferedPutCounter += bufferedCounter;
+        this.totalFailedPutCounter += failedCounter;
+
+        this.serverToBufferedCounterMap.put(addr.getHostNameWithPort(),
+            bufferedCounter);
+        this.serverToFailedCounterMap.put(addr.getHostNameWithPort(),
+            failedCounter);
+      }
+    }
+
+    public long getTotalBufferedCounter() {
+      return this.totalBufferedPutCounter;
+    }
+
+    public long getTotalFailedCounter() {
+      return this.totalFailedPutCounter;
+    }
+
+    public Map<String, Long> getBufferedCounterForEachRegionServer() {
+      return this.serverToBufferedCounterMap;
+    }
+
+    public Map<String, Long> getFailedCounterForEachRegionServer() {
+      return this.serverToFailedCounterMap;
+    }
+  }
+  
+  private static class PutStatus {
+    private final HRegionInfo regionInfo;
+    private final Put put;
+    private final int retryCount;
+    public PutStatus(final HRegionInfo regionInfo, final Put put,
+        final int retryCount) {
+      this.regionInfo = regionInfo;
+      this.put = put;
+      this.retryCount = retryCount;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return regionInfo;
+    }
+    public Put getPut() {
+      return put;
+    }
+    public int getRetryCount() {
+      return retryCount;
+    }
+  }
+
+  private static class HTableFlushWorker implements Runnable {
+    private HServerAddress addr;
+    private Configuration conf;
+    private LinkedBlockingQueue<PutStatus> queue;
+    private HConnection connection;
+    private HTableMultiplexer htableMultiplexer;
+    private AtomicLong totalFailedPutCount;
+    private AtomicInteger currentProcessingPutCount;
+    
+    public HTableFlushWorker(Configuration conf, HServerAddress addr,
+        HConnection connection, HTableMultiplexer htableMultiplexer,
+        LinkedBlockingQueue<PutStatus> queue) {
+      this.addr = addr;
+      this.conf = conf;
+      this.connection = connection;
+      this.htableMultiplexer = htableMultiplexer;
+      this.queue = queue;
+      this.totalFailedPutCount = new AtomicLong(0);
+      this.currentProcessingPutCount = new AtomicInteger(0);
+    }
+
+    public long getTotalFailedCount() {
+      return totalFailedPutCount.get();
+    }
+
+    public long getTotalBufferedCount() {
+      return queue.size() + currentProcessingPutCount.get();
+    }
+
+    private boolean resubmitFailedPut(PutStatus failedPutStatus) throws IOException{
+      Put failedPut = failedPutStatus.getPut();
+      // The currentPut is failed. So get the table name for the currentPut.
+      byte[] tableName = failedPutStatus.getRegionInfo().getTableDesc().getName();
+      // Clear the cached location for the failed puts
+      this.connection.deleteCachedLocation(tableName, failedPut.getRow());
+      // Decrease the retry count
+      int retryCount = failedPutStatus.getRetryCount() - 1;
+      
+      if (retryCount <= 0) {
+        // Update the failed counter and no retry any more.
+        return false;
+      } else {
+        // Retry one more time
+        return this.htableMultiplexer.put(tableName, failedPut, retryCount);
+      }
+    }
+
+    @Override
+    public void run() {
+      List<PutStatus> processingList = new ArrayList<PutStatus>();
+      /** 
+       * The frequency in milliseconds for the current thread to process the corresponding
 
+       * buffer queue.  
+       **/
+      long frequency = conf.getLong("hbase.htablemultiplexer.flush.frequency.ms", 100);
+      
+      // initial delay
+      try {
+        Thread.sleep(frequency);
+      } catch (InterruptedException e) {
+      } // Ignore
+
+      long start, elapsed;
+      int failedCount = 0;
+      while (true) {
+        try {
+          start = System.currentTimeMillis();
+
+          // Clear the processingList, putToStatusMap and failedCount
+          processingList.clear();
+          failedCount = 0;
+          
+          // drain all the queued puts into the tmp list
+          queue.drainTo(processingList);
+          currentProcessingPutCount.set(processingList.size());
+
+          if (processingList.size() > 0) {
+            // Create the MultiPut object
+            MultiPut mput = new MultiPut(this.addr);
+            for (PutStatus putStatus: processingList) {
+              // Update the MultiPut
+              mput.add(putStatus.getRegionInfo().getRegionName(), 
+                  putStatus.getPut());
+            }
+            
+            // Process this multiput request
+            List<Put> failed = connection.processSingleMultiPut(mput);
+            
+            if (failed != null) {
+              if (failed.size() == processingList.size()) {
+                // All the puts for this region server are failed. Going to retry it later
+                for (PutStatus putStatus: processingList) {
+                  if (!resubmitFailedPut(putStatus)) {
+                    failedCount++;
+                  }
+                }
+              } else {
+                Set<Put> failedPutSet = new HashSet<Put>(failed);
+                for (PutStatus putStatus: processingList) {
+                  if (failedPutSet.contains(putStatus.getPut()) && !resubmitFailedPut(putStatus))
{
+                    failedCount++;
+                  }
+                }
+              }
+            }
+            // Update the totalFailedCount
+            this.totalFailedPutCount.addAndGet(failedCount);
+            
+            // Reset the current processing put count
+            currentProcessingPutCount.set(0);
+            
+            // Log some basic info
+            LOG.debug("Processed " + currentProcessingPutCount
+                + " put requests for " + addr.getHostNameWithPort()
+                + " and " + failedCount + " failed");
+          }
+
+          // Sleep for a while
+          elapsed = System.currentTimeMillis() - start;
+          if (elapsed < frequency) {
+            Thread.sleep(frequency - elapsed);
+          }
+        } catch (Exception e) {
+          // Log all the exceptions and move on
+          LOG.debug("Caught some exceptions " + e
+              + " when flushing puts to region server "
+              + addr.getHostNameWithPort());
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=1340448&r1=1340447&r2=1340448&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java Sat
May 19 12:50:41 2012
@@ -20,11 +20,6 @@
 
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -38,6 +33,11 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
 /**
  * Data type class for putting multiple regions worth of puts in one RPC.
  */
@@ -80,7 +80,20 @@ public class MultiPut extends Operation 
     }
     rsput.add(aPut);
   }
+  
+  public void addAll(byte[] regionName, List<Put> putList) {
+    List<Put> rsput = puts.get(regionName);
+    if (rsput == null) {
+      rsput = new ArrayList<Put>();
+      puts.put(regionName, rsput);
+    }
+    rsput.addAll(putList);
+  }
 
+  public void clear() {
+    puts.clear();
+  }
+  
   public Collection<Put> allPuts() {
     List<Put> res = new ArrayList<Put>();
     for ( List<Put> pp : puts.values() ) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1340448&r1=1340447&r2=1340448&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Sat May 19 12:50:41 2012
@@ -2140,7 +2140,7 @@ public class HRegionServer implements HR
         if (codes[i] != OperationStatusCode.SUCCESS)
           return i;
       }
-      return -1;
+      return HConstants.MULTIPUT_SUCCESS;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java?rev=1340448&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableMultiplexer.java
Sat May 19 12:50:41 2012
@@ -0,0 +1,135 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTableMultiplexer.HTableMultiplexerStatus;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHTableMultiplexer {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static byte[] VALUE1 = Bytes.toBytes("testValue1");
+  private static byte[] VALUE2 = Bytes.toBytes("testValue2");
+  private static int SLAVES = 3;
+  private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testHTableMultiplexer() throws Exception {
+    byte[] TABLE = Bytes.toBytes("testHTableMultiplexer");
+    final int NUM_REGIONS = 10;
+    final int VERSION = 3;
+    List<Put> failedPuts = null;
+    boolean success = false;
+    
+    HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
+        PER_REGIONSERVER_QUEUE_SIZE);
+    HTableMultiplexerStatus status = multiplexer.getHTableMultiplexerStatus();
+
+    HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION,
+        Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
+    byte[][] startRows = ht.getStartKeys();
+    byte[][] endRows = ht.getEndKeys();
+
+    // SinglePut case
+    for (int i = 0; i < NUM_REGIONS; i++) {
+      Put put = new Put(startRows[i]);
+      put.add(FAMILY, QUALIFIER, VALUE1);
+      success = multiplexer.put(TABLE, put);
+      Assert.assertTrue(success);
+
+      // ensure the buffer has been flushed
+      verifyAllBufferedPutsHasFlushed(status);
+
+      // verify that the Get returns the correct result
+      Get get = new Get(startRows[i]);
+      get.addColumn(FAMILY, QUALIFIER);
+      Result r = ht.get(get);
+      Assert.assertEquals(0,
+          Bytes.compareTo(VALUE1, r.getValue(FAMILY, QUALIFIER)));
+    }
+
+    // MultiPut case
+    List<Put> multiput = new ArrayList<Put>();
+    for (int i = 0; i < NUM_REGIONS; i++) {
+      Put put = new Put(endRows[i]);
+      put.add(FAMILY, QUALIFIER, VALUE2);
+      multiput.add(put);
+    }
+    failedPuts = multiplexer.put(TABLE, multiput);
+    Assert.assertTrue(failedPuts == null);
+
+    // ensure the buffer has been flushed
+    verifyAllBufferedPutsHasFlushed(status);
+
+    // verify that the Get returns the correct result
+    for (int i = 0; i < NUM_REGIONS; i++) {
+      Get get = new Get(endRows[i]);
+      get.addColumn(FAMILY, QUALIFIER);
+      Result r = ht.get(get);
+      Assert.assertEquals(0,
+          Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)));
+    }
+  }
+
+  private void verifyAllBufferedPutsHasFlushed(HTableMultiplexerStatus status) {
+    int retries = 5;
+    int tries = 0;
+    do {
+      try {
+        Thread.sleep(2 * TEST_UTIL.getConfiguration().getLong(
+            "hbase.htablemultiplexer.flush.frequency", 100));
+        tries++;
+      } catch (InterruptedException e) {
+      } // ignore
+    } while (status.getTotalBufferedCounter() != 0 && tries != retries);
+
+    Assert.assertEquals("There are still some buffered puts left in the queue",
+        0, status.getTotalBufferedCounter());
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java?rev=1340448&r1=1340447&r2=1340448&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java
Sat May 19 12:50:41 2012
@@ -289,8 +289,8 @@ public class TestRegionPlacement {
     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
     HMaster m = cluster.getMaster();
 
-    int retry = 5;
-    long sleep = 2 * TEST_UTIL.getConfiguration().
+    int retry = 10;
+    long sleep = 3 * TEST_UTIL.getConfiguration().
       getInt("hbase.regionserver.msginterval", 1000);
     int attempt = 0;
     int currentRegionOpened, regionMovement;



Mime
View raw message