hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject svn commit: r1449950 [11/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ h...
Date Mon, 25 Feb 2013 22:50:29 GMT
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,561 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
+ * Each put will be sharded into different buffer queues based on its destination region server.
+ * So each region server buffer queue will only have the puts which share the same destination.
+ * And each queue will have a flush worker thread to flush the puts request to the region server.
+ * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that 
+ * particular queue.
+ * 
+ * Also all the puts will be retried as a configuration number before dropping.
+ * And the HTableMultiplexer can report the number of buffered requests and the number of the
+ * failed (dropped) requests in total or on per region server basis.
+ * 
+ * This class is thread safe.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HTableMultiplexer {
+  private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
+  private static int poolID = 0;
+  
+  static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
+
+  private Map<byte[], HTable> tableNameToHTableMap;
+
+  /** The map between each region server to its corresponding buffer queue */
+  private Map<HRegionLocation, LinkedBlockingQueue<PutStatus>>
+    serverToBufferQueueMap;
+
+  /** The map between each region server to its flush worker */
+  private Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap;
+
+  private Configuration conf;
+  private int retryNum;
+  private int perRegionServerBufferQueueSize;
+  
+  /**
+   * 
+   * @param conf The HBaseConfiguration
+   * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops 
+   *         for each region server before dropping the request.
+   */
+  public HTableMultiplexer(Configuration conf,
+      int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException {
+    this.conf = conf;
+    this.serverToBufferQueueMap = new ConcurrentHashMap<HRegionLocation,
+      LinkedBlockingQueue<PutStatus>>();
+    this.serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
+    this.tableNameToHTableMap = new ConcurrentHashMap<byte[], HTable>();
+    this.retryNum = conf.getInt("hbase.client.retries.number", 10);
+    this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
+  }
+
+  /**
+   * The put request will be buffered by its corresponding buffer queue. Return false if the queue
+   * is already full.
+   * @param table
+   * @param put
+   * @return true if the request can be accepted by its corresponding buffer queue.
+   * @throws IOException
+   */
+  public boolean put(final byte[] table, final Put put) throws IOException {
+    return put(table, put, this.retryNum);
+  }
+
+  /**
+   * The puts request will be buffered by their corresponding buffer queue. 
+   * Return the list of puts which could not be queued.
+   * @param table
+   * @param puts
+   * @return the list of puts which could not be queued
+   * @throws IOException
+   */
+  public List<Put> put(final byte[] table, final List<Put> puts)
+      throws IOException {
+    if (puts == null)
+      return null;
+    
+    List <Put> failedPuts = null;
+    boolean result;
+    for (Put put : puts) {
+      result = put(table, put, this.retryNum);
+      if (result == false) {
+        
+        // Create the failed puts list if necessary
+        if (failedPuts == null) {
+          failedPuts = new ArrayList<Put>();
+        }
+        // Add the put to the failed puts list
+        failedPuts.add(put);
+      }
+    }
+    return failedPuts;
+  }
+
+  /**
+   * The put request will be buffered by its corresponding buffer queue. And the put request will be
+   * retried before dropping the request.
+   * Return false if the queue is already full.
+   * @param table
+   * @param put
+   * @param retry
+   * @return true if the request can be accepted by its corresponding buffer queue.
+   * @throws IOException
+   */
+  public boolean put(final byte[] table, final Put put, int retry)
+      throws IOException {
+    if (retry <= 0) {
+      return false;
+    }
+
+    LinkedBlockingQueue<PutStatus> queue;
+    HTable htable = getHTable(table);
+    try {
+      htable.validatePut(put);
+      HRegionLocation loc = htable.getRegionLocation(put.getRow(), false);
+      if (loc != null) {
+        // Add the put pair into its corresponding queue.
+        queue = addNewRegionServer(loc, htable);
+        // Generate a MultiPutStatus obj and offer it into the queue
+        PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
+        
+        return queue.offer(s);
+      }
+    } catch (Exception e) {
+      LOG.debug("Cannot process the put " + put + " because of " + e);
+    }
+    return false;
+  }
+
+  /**
+   * @return the current HTableMultiplexerStatus
+   */
+  public HTableMultiplexerStatus getHTableMultiplexerStatus() {
+    return new HTableMultiplexerStatus(serverToFlushWorkerMap);
+  }
+
+
+  private HTable getHTable(final byte[] table) throws IOException {
+    HTable htable = this.tableNameToHTableMap.get(table);
+    if (htable == null) {
+      synchronized (this.tableNameToHTableMap) {
+        htable = this.tableNameToHTableMap.get(table);
+        if (htable == null)  {
+          htable = new HTable(conf, table);
+          this.tableNameToHTableMap.put(table, htable);
+        }
+      }
+    }
+    return htable;
+  }
+
+  private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(
+      HRegionLocation addr, HTable htable) {
+    LinkedBlockingQueue<PutStatus> queue =
+      serverToBufferQueueMap.get(addr);
+    if (queue == null) {
+      // Create a queue for the new region server
+      queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
+      serverToBufferQueueMap.put(addr, queue);
+
+      // Create the flush worker
+      HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
+          this, queue, htable);
+      this.serverToFlushWorkerMap.put(addr, worker);
+
+      // Launch a daemon thread to flush the puts
+      // from the queue to its corresponding region server.
+      String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-"
+          + (poolID++);
+      Thread t = new Thread(worker, name);
+      t.setDaemon(true);
+      t.start();
+    }
+    return queue;
+  }
+
+  /**
+   * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer.
+   * report the number of buffered requests and the number of the failed (dropped) requests
+   * in total or on per region server basis.
+   */
+  static class HTableMultiplexerStatus {
+    private long totalFailedPutCounter;
+    private long totalBufferedPutCounter;
+    private long maxLatency;
+    private long overallAverageLatency;
+    private Map<String, Long> serverToFailedCounterMap;
+    private Map<String, Long> serverToBufferedCounterMap;
+    private Map<String, Long> serverToAverageLatencyMap;
+    private Map<String, Long> serverToMaxLatencyMap;
+
+    public HTableMultiplexerStatus(
+        Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+      this.totalBufferedPutCounter = 0;
+      this.totalFailedPutCounter = 0;
+      this.maxLatency = 0;
+      this.overallAverageLatency = 0;
+      this.serverToBufferedCounterMap = new HashMap<String, Long>();
+      this.serverToFailedCounterMap = new HashMap<String, Long>();
+      this.serverToAverageLatencyMap = new HashMap<String, Long>();
+      this.serverToMaxLatencyMap = new HashMap<String, Long>();
+      this.initialize(serverToFlushWorkerMap);
+    }
+
+    private void initialize(
+        Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
+      if (serverToFlushWorkerMap == null) {
+        return;
+      }
+
+      long averageCalcSum = 0;
+      int averageCalcCount = 0;
+      for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : serverToFlushWorkerMap
+          .entrySet()) {
+        HRegionLocation addr = entry.getKey();
+        HTableFlushWorker worker = entry.getValue();
+
+        long bufferedCounter = worker.getTotalBufferedCount();
+        long failedCounter = worker.getTotalFailedCount();
+        long serverMaxLatency = worker.getMaxLatency();
+        AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
+        // Get sum and count pieces separately to compute overall average
+        SimpleEntry<Long, Integer> averageComponents = averageCounter
+            .getComponents();
+        long serverAvgLatency = averageCounter.getAndReset();
+
+        this.totalBufferedPutCounter += bufferedCounter;
+        this.totalFailedPutCounter += failedCounter;
+        if (serverMaxLatency > this.maxLatency) {
+          this.maxLatency = serverMaxLatency;
+        }
+        averageCalcSum += averageComponents.getKey();
+        averageCalcCount += averageComponents.getValue();
+
+        this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
+            bufferedCounter);
+        this.serverToFailedCounterMap
+            .put(addr.getHostnamePort(),
+            failedCounter);
+        this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
+            serverAvgLatency);
+        this.serverToMaxLatencyMap
+            .put(addr.getHostnamePort(),
+            serverMaxLatency);
+      }
+      this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
+          / averageCalcCount : 0;
+    }
+
+    public long getTotalBufferedCounter() {
+      return this.totalBufferedPutCounter;
+    }
+
+    public long getTotalFailedCounter() {
+      return this.totalFailedPutCounter;
+    }
+
+    public long getMaxLatency() {
+      return this.maxLatency;
+    }
+
+    public long getOverallAverageLatency() {
+      return this.overallAverageLatency;
+    }
+
+    public Map<String, Long> getBufferedCounterForEachRegionServer() {
+      return this.serverToBufferedCounterMap;
+    }
+
+    public Map<String, Long> getFailedCounterForEachRegionServer() {
+      return this.serverToFailedCounterMap;
+    }
+
+    public Map<String, Long> getMaxLatencyForEachRegionServer() {
+      return this.serverToMaxLatencyMap;
+    }
+
+    public Map<String, Long> getAverageLatencyForEachRegionServer() {
+      return this.serverToAverageLatencyMap;
+    }
+  }
+  
+  private static class PutStatus {
+    private final HRegionInfo regionInfo;
+    private final Put put;
+    private final int retryCount;
+    public PutStatus(final HRegionInfo regionInfo, final Put put,
+        final int retryCount) {
+      this.regionInfo = regionInfo;
+      this.put = put;
+      this.retryCount = retryCount;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return regionInfo;
+    }
+    public Put getPut() {
+      return put;
+    }
+    public int getRetryCount() {
+      return retryCount;
+    }
+  }
+
+  /**
+   * Helper to count the average over an interval until reset.
+   */
+  private static class AtomicAverageCounter {
+    private long sum;
+    private int count;
+
+    public AtomicAverageCounter() {
+      this.sum = 0L;
+      this.count = 0;
+    }
+
+    public synchronized long getAndReset() {
+      long result = this.get();
+      this.reset();
+      return result;
+    }
+
+    public synchronized long get() {
+      if (this.count == 0) {
+        return 0;
+      }
+      return this.sum / this.count;
+    }
+
+    public synchronized SimpleEntry<Long, Integer> getComponents() {
+      return new SimpleEntry<Long, Integer>(sum, count);
+    }
+
+    public synchronized void reset() {
+      this.sum = 0l;
+      this.count = 0;
+    }
+
+    public synchronized void add(long value) {
+      this.sum += value;
+      this.count++;
+    }
+  }
+
+  private static class HTableFlushWorker implements Runnable {
+    private HRegionLocation addr;
+    private Configuration conf;
+    private LinkedBlockingQueue<PutStatus> queue;
+    private HTableMultiplexer htableMultiplexer;
+    private AtomicLong totalFailedPutCount;
+    private AtomicInteger currentProcessingPutCount;
+    private AtomicAverageCounter averageLatency;
+    private AtomicLong maxLatency;
+    private HTable htable; // For Multi
+    
+    public HTableFlushWorker(Configuration conf, HRegionLocation addr,
+        HTableMultiplexer htableMultiplexer,
+        LinkedBlockingQueue<PutStatus> queue, HTable htable) {
+      this.addr = addr;
+      this.conf = conf;
+      this.htableMultiplexer = htableMultiplexer;
+      this.queue = queue;
+      this.totalFailedPutCount = new AtomicLong(0);
+      this.currentProcessingPutCount = new AtomicInteger(0);
+      this.averageLatency = new AtomicAverageCounter();
+      this.maxLatency = new AtomicLong(0);
+      this.htable = htable;
+    }
+
+    public long getTotalFailedCount() {
+      return totalFailedPutCount.get();
+    }
+
+    public long getTotalBufferedCount() {
+      return queue.size() + currentProcessingPutCount.get();
+    }
+
+    public AtomicAverageCounter getAverageLatencyCounter() {
+      return this.averageLatency;
+    }
+
+    public long getMaxLatency() {
+      return this.maxLatency.getAndSet(0);
+    }
+
+    private boolean resubmitFailedPut(PutStatus failedPutStatus,
+        HRegionLocation oldLoc) throws IOException {
+      Put failedPut = failedPutStatus.getPut();
+      // The currentPut is failed. So get the table name for the currentPut.
+      byte[] tableName = failedPutStatus.getRegionInfo().getTableName();
+      // Decrease the retry count
+      int retryCount = failedPutStatus.getRetryCount() - 1;
+      
+      if (retryCount <= 0) {
+        // Update the failed counter and no retry any more.
+        return false;
+      } else {
+        // Retry one more time
+        return this.htableMultiplexer.put(tableName, failedPut, retryCount);
+      }
+    }
+
+    @Override
+    public void run() {
+      List<PutStatus> processingList = new ArrayList<PutStatus>();
+      /** 
+       * The frequency in milliseconds for the current thread to process the corresponding  
+       * buffer queue.  
+       **/
+      long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100);
+      
+      // initial delay
+      try {
+        Thread.sleep(frequency);
+      } catch (InterruptedException e) {
+      } // Ignore
+
+      long start, elapsed;
+      int failedCount = 0;
+      while (true) {
+        try {
+          start = elapsed = EnvironmentEdgeManager.currentTimeMillis();
+
+          // Clear the processingList, putToStatusMap and failedCount
+          processingList.clear();
+          failedCount = 0;
+          
+          // drain all the queued puts into the tmp list
+          queue.drainTo(processingList);
+          currentProcessingPutCount.set(processingList.size());
+
+          if (processingList.size() > 0) {
+            ArrayList<Put> list = new ArrayList<Put>(processingList.size());
+            for (PutStatus putStatus: processingList) {
+              list.add(putStatus.getPut());
+            }
+            
+            // Process this multiput request
+            List<Put> failed = null;
+            Object[] results = new Object[list.size()];
+            try {
+              htable.batch(list, results);
+            } catch (IOException e) {
+              LOG.debug("Caught some exceptions " + e
+                  + " when flushing puts to region server " + addr.getHostnamePort());
+            } finally {
+              // mutate list so that it is empty for complete success, or
+              // contains only failed records
+              // results are returned in the same order as the requests in list
+              // walk the list backwards, so we can remove from list without
+              // impacting the indexes of earlier members
+              for (int i = results.length - 1; i >= 0; i--) {
+                if (results[i] instanceof Result) {
+                  // successful Puts are removed from the list here.
+                  list.remove(i);
+                }
+              }
+              failed = list;
+            }
+
+            if (failed != null) {
+              if (failed.size() == processingList.size()) {
+                // All the puts for this region server are failed. Going to retry it later
+                for (PutStatus putStatus: processingList) {
+                  if (!resubmitFailedPut(putStatus, this.addr)) {
+                    failedCount++;
+                  }
+                }
+              } else {
+                Set<Put> failedPutSet = new HashSet<Put>(failed);
+                for (PutStatus putStatus: processingList) {
+                  if (failedPutSet.contains(putStatus.getPut())
+                      && !resubmitFailedPut(putStatus, this.addr)) {
+                    failedCount++;
+                  }
+                }
+              }
+            }
+            // Update the totalFailedCount
+            this.totalFailedPutCount.addAndGet(failedCount);
+            
+            elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
+            // Update latency counters
+            averageLatency.add(elapsed);
+            if (elapsed > maxLatency.get()) {
+              maxLatency.set(elapsed);
+            }
+            
+            // Log some basic info
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Processed " + currentProcessingPutCount
+                  + " put requests for " + addr.getHostnamePort() + " and "
+                  + failedCount + " failed" + ", latency for this send: "
+                  + elapsed);
+            }
+
+            // Reset the current processing put count
+            currentProcessingPutCount.set(0);
+          }
+
+          // Sleep for a while
+          if (elapsed == start) {
+            elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
+          }
+          if (elapsed < frequency) {
+            Thread.sleep(frequency - elapsed);
+          }
+        } catch (Exception e) {
+          // Log all the exceptions and move on
+          LOG.debug("Caught some exceptions " + e
+              + " when flushing puts to region server "
+              + addr.getHostnamePort());
+        }
+      }
+    }
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,547 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple pool of HTable instances.
+ *
+ * Each HTablePool acts as a pool for all tables. To use, instantiate an
+ * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
+ *
+   * This method is not needed anymore, clients should call
+   * HTableInterface.close() rather than returning the tables to the pool
+   *
+ * Once you are done with it, close your instance of {@link HTableInterface}
+ * by calling {@link HTableInterface#close()} rather than returning the tables
+ * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
+ *
+ * <p>
+ * A pool can be created with a <i>maxSize</i> which defines the most HTable
+ * references that will ever be retained for each table. Otherwise the default
+ * is {@link Integer#MAX_VALUE}.
+ *
+ * <p>
+ * Pool will manage its own connections to the cluster. See
+ * {@link HConnectionManager}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HTablePool implements Closeable {
+  private final PoolMap<String, HTableInterface> tables;
+  private final int maxSize;
+  private final PoolType poolType;
+  private final Configuration config;
+  private final HTableInterfaceFactory tableFactory;
+
+  /**
+   * Default Constructor. Default HBaseConfiguration and no limit on pool size.
+   */
+  public HTablePool() {
+    this(HBaseConfiguration.create(), Integer.MAX_VALUE);
+  }
+
+  /**
+   * Constructor to set maximum versions and use the specified configuration.
+   *
+   * @param config
+   *          configuration
+   * @param maxSize
+   *          maximum number of references to keep for each table
+   */
+  public HTablePool(final Configuration config, final int maxSize) {
+    this(config, maxSize, null, null);
+  }
+
+  /**
+   * Constructor to set maximum versions and use the specified configuration and
+   * table factory.
+   *
+   * @param config
+   *          configuration
+   * @param maxSize
+   *          maximum number of references to keep for each table
+   * @param tableFactory
+   *          table factory
+   */
+  public HTablePool(final Configuration config, final int maxSize,
+      final HTableInterfaceFactory tableFactory) {
+    this(config, maxSize, tableFactory, PoolType.Reusable);
+  }
+
+  /**
+   * Constructor to set maximum versions and use the specified configuration and
+   * pool type.
+   *
+   * @param config
+   *          configuration
+   * @param maxSize
+   *          maximum number of references to keep for each table
+   * @param poolType
+   *          pool type which is one of {@link PoolType#Reusable} or
+   *          {@link PoolType#ThreadLocal}
+   */
+  public HTablePool(final Configuration config, final int maxSize,
+      final PoolType poolType) {
+    this(config, maxSize, null, poolType);
+  }
+
+  /**
+   * Constructor to set maximum versions and use the specified configuration,
+   * table factory and pool type. The HTablePool supports the
+   * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
+   * type is null or not one of those two values, then it will default to
+   * {@link PoolType#Reusable}.
+   *
+   * @param config
+   *          configuration
+   * @param maxSize
+   *          maximum number of references to keep for each table
+   * @param tableFactory
+   *          table factory
+   * @param poolType
+   *          pool type which is one of {@link PoolType#Reusable} or
+   *          {@link PoolType#ThreadLocal}
+   */
+  public HTablePool(final Configuration config, final int maxSize,
+      final HTableInterfaceFactory tableFactory, PoolType poolType) {
+    // Make a new configuration instance so I can safely cleanup when
+    // done with the pool.
+    this.config = config == null ? HBaseConfiguration.create() : config;
+    this.maxSize = maxSize;
+    this.tableFactory = tableFactory == null ? new HTableFactory()
+        : tableFactory;
+    if (poolType == null) {
+      this.poolType = PoolType.Reusable;
+    } else {
+      switch (poolType) {
+      case Reusable:
+      case ThreadLocal:
+        this.poolType = poolType;
+        break;
+      default:
+        this.poolType = PoolType.Reusable;
+        break;
+      }
+    }
+    this.tables = new PoolMap<String, HTableInterface>(this.poolType,
+        this.maxSize);
+  }
+
+  /**
+   * Get a reference to the specified table from the pool.
+   * <p>
+   * <p/>
+   *
+   * @param tableName
+   *          table name
+   * @return a reference to the specified table
+   * @throws RuntimeException
+   *           if there is a problem instantiating the HTable
+   */
+  public HTableInterface getTable(String tableName) {
+    // call the old getTable implementation renamed to findOrCreateTable
+    HTableInterface table = findOrCreateTable(tableName);
+    // return a proxy table so when user closes the proxy, the actual table
+    // will be returned to the pool
+    return new PooledHTable(table);
+  }
+
+  /**
+   * Get a reference to the specified table from the pool.
+   * <p>
+   *
+   * Create a new one if one is not available.
+   *
+   * @param tableName
+   *          table name
+   * @return a reference to the specified table
+   * @throws RuntimeException
+   *           if there is a problem instantiating the HTable
+   */
+  private HTableInterface findOrCreateTable(String tableName) {
+    HTableInterface table = tables.get(tableName);
+    if (table == null) {
+      table = createHTable(tableName);
+    }
+    return table;
+  }
+
+  /**
+   * Get a reference to the specified table from the pool.
+   * <p>
+   *
+   * Create a new one if one is not available.
+   *
+   * @param tableName
+   *          table name
+   * @return a reference to the specified table
+   * @throws RuntimeException
+   *           if there is a problem instantiating the HTable
+   */
+  public HTableInterface getTable(byte[] tableName) {
+    return getTable(Bytes.toString(tableName));
+  }
+
+  /**
+   * This method is not needed anymore, clients should call
+   * HTableInterface.close() rather than returning the tables to the pool
+   *
+   * @param table
+   *          the proxy table user got from pool
+   * @deprecated
+   */
+  public void putTable(HTableInterface table) throws IOException {
+    // we need to be sure nobody puts a proxy implementation in the pool
+    // but if the client code is not updated
+    // and it will continue to call putTable() instead of calling close()
+    // then we need to return the wrapped table to the pool instead of the
+    // proxy
+    // table
+    if (table instanceof PooledHTable) {
+      returnTable(((PooledHTable) table).getWrappedTable());
+    } else {
+      // normally this should not happen if clients pass back the same
+      // table
+      // object they got from the pool
+      // but if it happens then it's better to reject it
+      throw new IllegalArgumentException("not a pooled table: " + table);
+    }
+  }
+
+  /**
+   * Puts the specified HTable back into the pool.
+   * <p>
+   *
+   * If the pool already contains <i>maxSize</i> references to the table, then
+   * the table instance gets closed after flushing buffered edits.
+   *
+   * @param table
+   *          table
+   */
+  private void returnTable(HTableInterface table) throws IOException {
+    // this is the old putTable method renamed and made private
+    String tableName = Bytes.toString(table.getTableName());
+    if (tables.size(tableName) >= maxSize) {
+      // release table instance since we're not reusing it
+      this.tables.remove(tableName, table);
+      this.tableFactory.releaseHTableInterface(table);
+      return;
+    }
+    tables.put(tableName, table);
+  }
+
+  protected HTableInterface createHTable(String tableName) {
+    return this.tableFactory.createHTableInterface(config,
+        Bytes.toBytes(tableName));
+  }
+
+  /**
+   * Closes all the HTable instances , belonging to the given table, in the
+   * table pool.
+   * <p>
+   * Note: this is a 'shutdown' of the given table pool and different from
+   * {@link #putTable(HTableInterface)}, that is used to return the table
+   * instance to the pool for future re-use.
+   *
+   * @param tableName
+   */
+  public void closeTablePool(final String tableName) throws IOException {
+    Collection<HTableInterface> tables = this.tables.values(tableName);
+    if (tables != null) {
+      for (HTableInterface table : tables) {
+        this.tableFactory.releaseHTableInterface(table);
+      }
+    }
+    this.tables.remove(tableName);
+  }
+
+  /**
+   * See {@link #closeTablePool(String)}.
+   *
+   * @param tableName
+   */
+  public void closeTablePool(final byte[] tableName) throws IOException {
+    closeTablePool(Bytes.toString(tableName));
+  }
+
+  /**
+   * Closes all the HTable instances , belonging to all tables in the table
+   * pool.
+   * <p>
+   * Note: this is a 'shutdown' of all the table pools.
+   */
+  public void close() throws IOException {
+    for (String tableName : tables.keySet()) {
+      closeTablePool(tableName);
+    }
+    this.tables.clear();
+  }
+
+  public int getCurrentPoolSize(String tableName) {
+    return tables.size(tableName);
+  }
+
+  /**
+   * A proxy class that implements HTableInterface.close method to return the
+   * wrapped table back to the table pool
+   *
+   */
+  class PooledHTable implements HTableInterface {
+
+    private HTableInterface table; // actual table implementation
+
+    public PooledHTable(HTableInterface table) {
+      this.table = table;
+    }
+
+    @Override
+    public byte[] getTableName() {
+      return table.getTableName();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return table.getConfiguration();
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+      return table.getTableDescriptor();
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+      return table.exists(get);
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+      return table.exists(gets);
+    }
+
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results) throws IOException,
+        InterruptedException {
+      table.batch(actions, results);
+    }
+
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException,
+        InterruptedException {
+      return table.batch(actions);
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+      return table.get(get);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+      return table.get(gets);
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+      return table.getRowOrBefore(row, family);
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+      return table.getScanner(scan);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+      return table.getScanner(family);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier)
+        throws IOException {
+      return table.getScanner(family, qualifier);
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+      table.put(put);
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+      table.put(puts);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+        byte[] value, Put put) throws IOException {
+      return table.checkAndPut(row, family, qualifier, value, put);
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+      table.delete(delete);
+    }
+
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+      table.delete(deletes);
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+        byte[] value, Delete delete) throws IOException {
+      return table.checkAndDelete(row, family, qualifier, value, delete);
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+      return table.increment(increment);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+        byte[] qualifier, long amount) throws IOException {
+      return table.incrementColumnValue(row, family, qualifier, amount);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+        byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
+      return table.incrementColumnValue(row, family, qualifier, amount,
+          writeToWAL);
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+      return table.isAutoFlush();
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+      table.flushCommits();
+    }
+
+    /**
+     * Returns the actual table back to the pool
+     *
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      returnTable(table);
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+      return table.coprocessorService(row);
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
+        byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
+        throws ServiceException, Throwable {
+      return table.coprocessorService(service, startKey, endKey, callable);
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service,
+        byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
+        throws ServiceException, Throwable {
+      table.coprocessorService(service, startKey, endKey, callable, callback);
+    }
+
+    @Override
+    public String toString() {
+      return "PooledHTable{" + ", table=" + table + '}';
+    }
+
+    /**
+     * Expose the wrapped HTable to tests in the same package
+     *
+     * @return wrapped htable
+     */
+    HTableInterface getWrappedTable() {
+      return table;
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions,
+        Object[] results, Callback<R> callback) throws IOException,
+        InterruptedException {
+      table.batchCallback(actions, results, callback);
+    }
+
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions,
+        Callback<R> callback) throws IOException, InterruptedException {
+      return table.batchCallback(actions,  callback);
+    }
+
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+      table.mutateRow(rm);
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+      return table.append(append);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+      table.setAutoFlush(autoFlush);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+      table.setAutoFlush(autoFlush, clearBufferOnFail);
+    }
+
+    @Override
+    public long getWriteBufferSize() {
+      return table.getWriteBufferSize();
+    }
+
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+      table.setWriteBufferSize(writeBufferSize);
+    }
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.HRegionLocation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for HTable.
+ * 
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HTableUtil {
+
+  private static final int INITIAL_LIST_SIZE = 250;
+	
+  /**
+   * Processes a List of Puts and writes them to an HTable instance in RegionServer buckets via the htable.put method. 
+   * This will utilize the writeBuffer, thus the writeBuffer flush frequency may be tuned accordingly via htable.setWriteBufferSize. 
+   * <br><br>
+   * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs in each flush.
+   * <br><br>
+   * Assumption #1:  Regions have been pre-created for the table.  If they haven't, then all of the Puts will go to the same region, 
+   * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this.
+   * <br>
+   * Assumption #2:  Row-keys are not monotonically increasing.  See the Apache HBase book for an explanation of this problem.  
+   * <br>
+   * Assumption #3:  That the input list of Puts is big enough to be useful (in the thousands or more).  The intent of this
+   * method is to process larger chunks of data.
+   * <br>
+   * Assumption #4:  htable.setAutoFlush(false) has been set.  This is a requirement to use the writeBuffer.
+   * <br><br>
+   * @param htable HTable instance for target HBase table
+   * @param puts List of Put instances
+   * @throws IOException if a remote or network exception occurs
+   * 
+   */
+  public static void bucketRsPut(HTable htable, List<Put> puts) throws IOException {
+
+    Map<String, List<Put>> putMap = createRsPutMap(htable, puts);
+    for (List<Put> rsPuts: putMap.values()) {
+      htable.put( rsPuts );
+    }
+    htable.flushCommits();
+  }
+	
+  /**
+   * Processes a List of Rows (Put, Delete) and writes them to an HTable instance in RegionServer buckets via the htable.batch method. 
+   * <br><br>
+   * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs, thus this will
+   * produce one RPC of Puts per RegionServer.
+   * <br><br>
+   * Assumption #1:  Regions have been pre-created for the table.  If they haven't, then all of the Puts will go to the same region, 
+   * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this.
+   * <br>
+   * Assumption #2:  Row-keys are not monotonically increasing.  See the Apache HBase book for an explanation of this problem.  
+   * <br>
+   * Assumption #3:  That the input list of Rows is big enough to be useful (in the thousands or more).  The intent of this
+   * method is to process larger chunks of data.
+   * <br><br>
+   * This method accepts a list of Row objects because the underlying .batch method accepts a list of Row objects.
+   * <br><br>
+   * @param htable HTable instance for target HBase table
+   * @param rows List of Row instances
+   * @throws IOException if a remote or network exception occurs
+   */
+  public static void bucketRsBatch(HTable htable, List<Row> rows) throws IOException {
+
+    try {
+      Map<String, List<Row>> rowMap = createRsRowMap(htable, rows);
+      for (List<Row> rsRows: rowMap.values()) {
+        htable.batch( rsRows );
+      }
+    } catch (InterruptedException e) {
+      throw new IOException(e); 
+    }
+		
+  }
+
+  private static Map<String,List<Put>> createRsPutMap(HTable htable, List<Put> puts) throws IOException {
+
+    Map<String, List<Put>> putMap = new HashMap<String, List<Put>>();
+    for (Put put: puts) {
+      HRegionLocation rl = htable.getRegionLocation( put.getRow() );
+      String hostname = rl.getHostname();
+      List<Put> recs = putMap.get( hostname);
+      if (recs == null) {
+        recs = new ArrayList<Put>(INITIAL_LIST_SIZE);
+    		putMap.put( hostname, recs);
+      }
+      recs.add(put);
+    }
+    return putMap;
+  }
+
+  private static Map<String,List<Row>> createRsRowMap(HTable htable, List<Row> rows) throws IOException {
+
+    Map<String, List<Row>> rowMap = new HashMap<String, List<Row>>();
+    for (Row row: rows) {
+      HRegionLocation rl = htable.getRegionLocation( row.getRow() );
+      String hostname = rl.getHostname();
+      List<Row> recs = rowMap.get( hostname);
+      if (recs == null) {
+        recs = new ArrayList<Row>(INITIAL_LIST_SIZE);
+        rowMap.put( hostname, recs);
+      }
+      recs.add(row);
+    }
+    return rowMap;
+  }
+		
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,258 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Used to perform Increment operations on a single row.
+ * <p>
+ * This operation does not appear atomic to readers.  Increments are done
+ * under a single row lock, so write operations to a row are synchronized, but
+ * readers do not take row locks so get and scan operations can see this
+ * operation partially completed.
+ * <p>
+ * To increment columns of a row, instantiate an Increment object with the row
+ * to increment.  At least one column to increment must be specified using the
+ * {@link #addColumn(byte[], byte[], long)} method.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class Increment implements Row {
+  private byte [] row = null;
+  private boolean writeToWAL = true;
+  private TimeRange tr = new TimeRange();
+  private Map<byte [], NavigableMap<byte [], Long>> familyMap =
+    new TreeMap<byte [], NavigableMap<byte [], Long>>(Bytes.BYTES_COMPARATOR);
+
+  /** Constructor for Writable.  DO NOT USE */
+  public Increment() {}
+
+  /**
+   * Create a Increment operation for the specified row, using an existing row
+   * lock.
+   * <p>
+   * At least one column must be incremented.
+   * @param row row key
+   */
+  public Increment(byte [] row) {
+    if (row == null) {
+      throw new IllegalArgumentException("Cannot increment a null row");
+    }
+    this.row = row;
+  }
+
+  /**
+   * Increment the column from the specific family with the specified qualifier
+   * by the specified amount.
+   * <p>
+   * Overrides previous calls to addColumn for this family and qualifier.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @param amount amount to increment by
+   * @return the Increment object
+   */
+  public Increment addColumn(byte [] family, byte [] qualifier, long amount) {
+    if (family == null) {
+      throw new IllegalArgumentException("family cannot be null");
+    }
+    if (qualifier == null) {
+      throw new IllegalArgumentException("qualifier cannot be null");
+    }
+    NavigableMap<byte [], Long> set = familyMap.get(family);
+    if(set == null) {
+      set = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
+    }
+    set.put(qualifier, amount);
+    familyMap.put(family, set);
+    return this;
+  }
+
+  /* Accessors */
+
+  /**
+   * Method for retrieving the increment's row
+   * @return row
+   */
+  public byte [] getRow() {
+    return this.row;
+  }
+
+  /**
+   * Method for retrieving whether WAL will be written to or not
+   * @return true if WAL should be used, false if not
+   */
+  public boolean getWriteToWAL() {
+    return this.writeToWAL;
+  }
+
+  /**
+   * Sets whether this operation should write to the WAL or not.
+   * @param writeToWAL true if WAL should be used, false if not
+   * @return this increment operation
+   */
+  public Increment setWriteToWAL(boolean writeToWAL) {
+    this.writeToWAL = writeToWAL;
+    return this;
+  }
+
+  /**
+   * Gets the TimeRange used for this increment.
+   * @return TimeRange
+   */
+  public TimeRange getTimeRange() {
+    return this.tr;
+  }
+
+  /**
+   * Sets the TimeRange to be used on the Get for this increment.
+   * <p>
+   * This is useful for when you have counters that only last for specific
+   * periods of time (ie. counters that are partitioned by time).  By setting
+   * the range of valid times for this increment, you can potentially gain
+   * some performance with a more optimal Get operation.
+   * <p>
+   * This range is used as [minStamp, maxStamp).
+   * @param minStamp minimum timestamp value, inclusive
+   * @param maxStamp maximum timestamp value, exclusive
+   * @throws IOException if invalid time range
+   * @return this
+   */
+  public Increment setTimeRange(long minStamp, long maxStamp)
+  throws IOException {
+    tr = new TimeRange(minStamp, maxStamp);
+    return this;
+  }
+
+  /**
+   * Method for retrieving the keys in the familyMap
+   * @return keys in the current familyMap
+   */
+  public Set<byte[]> familySet() {
+    return this.familyMap.keySet();
+  }
+
+  /**
+   * Method for retrieving the number of families to increment from
+   * @return number of families
+   */
+  public int numFamilies() {
+    return this.familyMap.size();
+  }
+
+  /**
+   * Method for retrieving the number of columns to increment
+   * @return number of columns across all families
+   */
+  public int numColumns() {
+    if (!hasFamilies()) return 0;
+    int num = 0;
+    for (NavigableMap<byte [], Long> family : familyMap.values()) {
+      num += family.size();
+    }
+    return num;
+  }
+
+  /**
+   * Method for checking if any families have been inserted into this Increment
+   * @return true if familyMap is non empty false otherwise
+   */
+  public boolean hasFamilies() {
+    return !this.familyMap.isEmpty();
+  }
+
+  /**
+   * Method for retrieving the increment's familyMap
+   * @return familyMap
+   */
+  public Map<byte[],NavigableMap<byte[], Long>> getFamilyMap() {
+    return this.familyMap;
+  }
+
+  /**
+   * @return String
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("row=");
+    sb.append(Bytes.toStringBinary(this.row));
+    if(this.familyMap.size() == 0) {
+      sb.append(", no columns set to be incremented");
+      return sb.toString();
+    }
+    sb.append(", families=");
+    boolean moreThanOne = false;
+    for(Map.Entry<byte [], NavigableMap<byte[], Long>> entry :
+      this.familyMap.entrySet()) {
+      if(moreThanOne) {
+        sb.append("), ");
+      } else {
+        moreThanOne = true;
+        sb.append("{");
+      }
+      sb.append("(family=");
+      sb.append(Bytes.toString(entry.getKey()));
+      sb.append(", columns=");
+      if(entry.getValue() == null) {
+        sb.append("NONE");
+      } else {
+        sb.append("{");
+        boolean moreThanOneB = false;
+        for(Map.Entry<byte [], Long> column : entry.getValue().entrySet()) {
+          if(moreThanOneB) {
+            sb.append(", ");
+          } else {
+            moreThanOneB = true;
+          }
+          sb.append(Bytes.toStringBinary(column.getKey()) + "+=" + column.getValue());
+        }
+        sb.append("}");
+      }
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+  @Override
+  public int compareTo(Row i) {
+    return Bytes.compareTo(this.getRow(), i.getRow());
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    Row other = (Row) obj;
+    return compareTo(other) == 0;
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,59 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Specify Isolation levels in Scan operations.
+ * <p>
+ * There are two isolation levels. A READ_COMMITTED isolation level
+ * indicates that only data that is committed be returned in a scan.
+ * An isolation level of READ_UNCOMMITTED indicates that a scan
+ * should return data that is being modified by transactions that might
+ * not have been committed yet.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public enum IsolationLevel {
+
+  READ_COMMITTED(1),
+  READ_UNCOMMITTED(2);
+
+  IsolationLevel(int value) {}
+
+  public byte [] toBytes() {
+    return new byte [] { toByte() };
+  }
+
+  public byte toByte() {
+    return (byte)this.ordinal();
+  }
+
+  public static IsolationLevel fromBytes(byte [] bytes) {
+    return IsolationLevel.fromByte(bytes[0]);
+  }
+
+  public static IsolationLevel fromByte(byte vbyte) {
+    return IsolationLevel.values()[vbyte];
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,44 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.hadoop.hbase.MasterAdminProtocol;
+
+import java.io.Closeable;
+
+/**
+ * A KeepAlive connection is not physically closed immediately after the close,
+ *  but rather kept alive for a few minutes. It makes sense only if it's shared.
+ *
+ * This interface is used by a dynamic proxy. It allows to have a #close
+ *  function in a master client.
+ *
+ * This class is intended to be used internally by HBase classes that need to
+ * speak the MasterAdminProtocol; but not by * final user code. Hence it's
+ * package protected.
+ */
+interface MasterAdminKeepAliveConnection extends MasterAdminProtocol, Closeable {
+
+  @Override
+  public void close();
+}
+

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,44 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.hadoop.hbase.MasterMonitorProtocol;
+
+import java.io.Closeable;
+
+/**
+ * A KeepAlive connection is not physically closed immediately after the close,
+ *  but rather kept alive for a few minutes. It makes sense only if it's shared.
+ *
+ * This interface is used by a dynamic proxy. It allows to have a #close
+ *  function in a master client.
+ *
+ * This class is intended to be used internally by HBase classes that need to
+ * speak the MasterMonitorProtocol; but not by final user code. Hence it's
+ * package protected.
+ */
+interface MasterMonitorKeepAliveConnection extends MasterMonitorProtocol, Closeable {
+
+  @Override
+  public void close();
+}
+

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,489 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
+import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Scanner class that contains the <code>.META.</code> table scanning logic
+ * and uses a Retryable scanner. Provided visitors will be called
+ * for each row.
+ *
+ * Although public visibility, this is not a public-facing API and may evolve in
+ * minor releases.
+ *
+ * <p> Note that during concurrent region splits, the scanner might not see
+ * META changes across rows (for parent and daughter entries) consistently.
+ * see HBASE-5986, and {@link BlockingMetaScannerVisitor} for details. </p>
+ */
+@InterfaceAudience.Private
+public class MetaScanner {
+  private static final Log LOG = LogFactory.getLog(MetaScanner.class);
+  /**
+   * Scans the meta table and calls a visitor on each RowResult and uses a empty
+   * start row value as table name.
+   *
+   * @param configuration conf
+   * @param visitor A custom visitor
+   * @throws IOException e
+   */
+  public static void metaScan(Configuration configuration,
+      MetaScannerVisitor visitor)
+  throws IOException {
+    metaScan(configuration, visitor, null);
+  }
+
+  /**
+   * Scans the meta table and calls a visitor on each RowResult. Uses a table
+   * name to locate meta regions.
+   *
+   * @param configuration config
+   * @param visitor visitor object
+   * @param userTableName User table name in meta table to start scan at.  Pass
+   * null if not interested in a particular table.
+   * @throws IOException e
+   */
+  public static void metaScan(Configuration configuration,
+      MetaScannerVisitor visitor, byte [] userTableName)
+  throws IOException {
+    metaScan(configuration, visitor, userTableName, null, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Scans the meta table and calls a visitor on each RowResult. Uses a table
+   * name and a row name to locate meta regions. And it only scans at most
+   * <code>rowLimit</code> of rows.
+   *
+   * @param configuration HBase configuration.
+   * @param visitor Visitor object.
+   * @param userTableName User table name in meta table to start scan at.  Pass
+   * null if not interested in a particular table.
+   * @param row Name of the row at the user table. The scan will start from
+   * the region row where the row resides.
+   * @param rowLimit Max of processed rows. If it is less than 0, it
+   * will be set to default value <code>Integer.MAX_VALUE</code>.
+   * @throws IOException e
+   */
+  public static void metaScan(Configuration configuration,
+      MetaScannerVisitor visitor, byte [] userTableName, byte[] row,
+      int rowLimit)
+  throws IOException {
+    metaScan(configuration, visitor, userTableName, row, rowLimit,
+      HConstants.META_TABLE_NAME);
+  }
+
+  /**
+   * Scans the meta table and calls a visitor on each RowResult. Uses a table
+   * name and a row name to locate meta regions. And it only scans at most
+   * <code>rowLimit</code> of rows.
+   *
+   * @param configuration HBase configuration.
+   * @param visitor Visitor object. Closes the visitor before returning.
+   * @param tableName User table name in meta table to start scan at.  Pass
+   * null if not interested in a particular table.
+   * @param row Name of the row at the user table. The scan will start from
+   * the region row where the row resides.
+   * @param rowLimit Max of processed rows. If it is less than 0, it
+   * will be set to default value <code>Integer.MAX_VALUE</code>.
+   * @param metaTableName Meta table to scan, root or meta.
+   * @throws IOException e
+   */
+  public static void metaScan(Configuration configuration,
+      final MetaScannerVisitor visitor, final byte[] tableName,
+      final byte[] row, final int rowLimit, final byte[] metaTableName)
+      throws IOException {
+    try {
+      HConnectionManager.execute(new HConnectable<Void>(configuration) {
+        @Override
+        public Void connect(HConnection connection) throws IOException {
+          metaScan(conf, connection, visitor, tableName, row, rowLimit,
+              metaTableName);
+          return null;
+        }
+      });
+    } finally {
+      visitor.close();
+    }
+  }
+
+  private static void metaScan(Configuration configuration, HConnection connection,
+      MetaScannerVisitor visitor, byte [] tableName, byte[] row,
+      int rowLimit, final byte [] metaTableName)
+  throws IOException {
+    int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE;
+
+    // if row is not null, we want to use the startKey of the row's region as
+    // the startRow for the meta scan.
+    byte[] startRow;
+    if (row != null) {
+      // Scan starting at a particular row in a particular table
+      assert tableName != null;
+      byte[] searchRow =
+        HRegionInfo.createRegionName(tableName, row, HConstants.NINES,
+          false);
+      HTable metaTable = null;
+      try {
+        metaTable = new HTable(configuration, HConstants.META_TABLE_NAME);
+        Result startRowResult = metaTable.getRowOrBefore(searchRow,
+            HConstants.CATALOG_FAMILY);
+        if (startRowResult == null) {
+          throw new TableNotFoundException("Cannot find row in .META. for table: "
+              + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
+        }
+        HRegionInfo regionInfo = getHRegionInfo(startRowResult);
+        if (regionInfo == null) {
+          throw new IOException("HRegionInfo was null or empty in Meta for " +
+            Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
+        }
+
+        byte[] rowBefore = regionInfo.getStartKey();
+        startRow = HRegionInfo.createRegionName(tableName, rowBefore,
+            HConstants.ZEROES, false);
+      } finally {
+        if (metaTable != null) {
+          metaTable.close();
+        }
+      }
+    } else if (tableName == null || tableName.length == 0) {
+      // Full META scan
+      startRow = HConstants.EMPTY_START_ROW;
+    } else {
+      // Scan META for an entire table
+      startRow = HRegionInfo.createRegionName(
+          tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false);
+    }
+
+    // Scan over each meta region
+    ScannerCallable callable;
+    int rows = Math.min(rowLimit, configuration.getInt(
+        HConstants.HBASE_META_SCANNER_CACHING,
+        HConstants.DEFAULT_HBASE_META_SCANNER_CACHING));
+    do {
+      final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Scanning " + Bytes.toString(metaTableName) +
+          " starting at row=" + Bytes.toStringBinary(startRow) + " for max=" +
+          rowUpperLimit + " rows using " + connection.toString());
+      }
+      callable = new ScannerCallable(connection, metaTableName, scan, null);
+      // Open scanner
+      callable.withRetries();
+
+      int processedRows = 0;
+      try {
+        callable.setCaching(rows);
+        done: do {
+          if (processedRows >= rowUpperLimit) {
+            break;
+          }
+          //we have all the rows here
+          Result [] rrs = callable.withRetries();
+          if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
+            break; //exit completely
+          }
+          for (Result rr : rrs) {
+            if (processedRows >= rowUpperLimit) {
+              break done;
+            }
+            if (!visitor.processRow(rr))
+              break done; //exit completely
+            processedRows++;
+          }
+          //here, we didn't break anywhere. Check if we have more rows
+        } while(true);
+        // Advance the startRow to the end key of the current region
+        startRow = callable.getHRegionInfo().getEndKey();
+      } finally {
+        // Close scanner
+        callable.setClose();
+        callable.withRetries();
+      }
+    } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0);
+  }
+
+  /**
+   * Returns HRegionInfo object from the column
+   * HConstants.CATALOG_FAMILY:HConstants.REGIONINFO_QUALIFIER of the catalog
+   * table Result.
+   * @param data a Result object from the catalog table scan
+   * @return HRegionInfo or null
+   */
+  public static HRegionInfo getHRegionInfo(Result data) {
+    byte [] bytes =
+      data.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+    if (bytes == null) return null;
+    HRegionInfo info = HRegionInfo.parseFromOrNull(bytes);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Current INFO from scan results = " + info);
+    }
+    return info;
+  }
+
+  /**
+   * Lists all of the regions currently in META.
+   * @param conf
+   * @return List of all user-space regions.
+   * @throws IOException
+   */
+  public static List<HRegionInfo> listAllRegions(Configuration conf)
+  throws IOException {
+    return listAllRegions(conf, true);
+  }
+
+  /**
+   * Lists all of the regions currently in META.
+   * @param conf
+   * @param offlined True if we are to include offlined regions, false and we'll
+   * leave out offlined regions from returned list.
+   * @return List of all user-space regions.
+   * @throws IOException
+   */
+  public static List<HRegionInfo> listAllRegions(Configuration conf, final boolean offlined)
+  throws IOException {
+    final List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
+    MetaScannerVisitor visitor = new BlockingMetaScannerVisitor(conf) {
+        @Override
+        public boolean processRowInternal(Result result) throws IOException {
+          if (result == null || result.isEmpty()) {
+            return true;
+          }
+
+          HRegionInfo regionInfo = getHRegionInfo(result);
+          if (regionInfo == null) {
+            LOG.warn("Null REGIONINFO_QUALIFIER: " + result);
+            return true;
+          }
+
+          // If region offline AND we are not to include offlined regions, return.
+          if (regionInfo.isOffline() && !offlined) return true;
+          regions.add(regionInfo);
+          return true;
+        }
+    };
+    metaScan(conf, visitor);
+    return regions;
+  }
+
+  /**
+   * Lists all of the table regions currently in META.
+   * @param conf
+   * @param offlined True if we are to include offlined regions, false and we'll
+   * leave out offlined regions from returned list.
+   * @return Map of all user-space regions to servers
+   * @throws IOException
+   */
+  public static NavigableMap<HRegionInfo, ServerName> allTableRegions(Configuration conf,
+      final byte [] tablename, final boolean offlined) throws IOException {
+    final NavigableMap<HRegionInfo, ServerName> regions =
+      new TreeMap<HRegionInfo, ServerName>();
+    MetaScannerVisitor visitor = new TableMetaScannerVisitor(conf, tablename) {
+      @Override
+      public boolean processRowInternal(Result rowResult) throws IOException {
+        HRegionInfo info = getHRegionInfo(rowResult);
+        ServerName serverName = HRegionInfo.getServerName(rowResult);
+
+        if (!(info.isOffline() || info.isSplit())) {
+          regions.put(new UnmodifyableHRegionInfo(info), serverName);
+        }
+        return true;
+      }
+    };
+    metaScan(conf, visitor, tablename);
+    return regions;
+  }
+
+  /**
+   * Visitor class called to process each row of the .META. table
+   */
+  public interface MetaScannerVisitor extends Closeable {
+    /**
+     * Visitor method that accepts a RowResult and the meta region location.
+     * Implementations can return false to stop the region's loop if it becomes
+     * unnecessary for some reason.
+     *
+     * @param rowResult result
+     * @return A boolean to know if it should continue to loop in the region
+     * @throws IOException e
+     */
+    public boolean processRow(Result rowResult) throws IOException;
+  }
+
+  public static abstract class MetaScannerVisitorBase implements MetaScannerVisitor {
+    @Override
+    public void close() throws IOException {
+    }
+  }
+
+  /**
+   * A MetaScannerVisitor that provides a consistent view of the table's
+   * META entries during concurrent splits (see HBASE-5986 for details). This class
+   * does not guarantee ordered traversal of meta entries, and can block until the
+   * META entries for daughters are available during splits.
+   */
+  public static abstract class BlockingMetaScannerVisitor
+    extends MetaScannerVisitorBase {
+
+    private static final int DEFAULT_BLOCKING_TIMEOUT = 10000;
+    private Configuration conf;
+    private TreeSet<byte[]> daughterRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    private int blockingTimeout;
+    private HTable metaTable;
+
+    public BlockingMetaScannerVisitor(Configuration conf) {
+      this.conf = conf;
+      this.blockingTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+          DEFAULT_BLOCKING_TIMEOUT);
+    }
+
+    public abstract boolean processRowInternal(Result rowResult) throws IOException;
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      if (metaTable != null) {
+        metaTable.close();
+        metaTable = null;
+      }
+    }
+
+    public HTable getMetaTable() throws IOException {
+      if (metaTable == null) {
+        metaTable = new HTable(conf, HConstants.META_TABLE_NAME);
+      }
+      return metaTable;
+    }
+
+    @Override
+    public boolean processRow(Result rowResult) throws IOException {
+      HRegionInfo info = getHRegionInfo(rowResult);
+      if (info == null) {
+        return true;
+      }
+
+      if (daughterRegions.remove(info.getRegionName())) {
+        return true; //we have already processed this row
+      }
+
+      if (info.isSplitParent()) {
+        /* we have found a parent region which was split. We have to ensure that it's daughters are
+         * seen by this scanner as well, so we block until they are added to the META table. Even
+         * though we are waiting for META entries, ACID semantics in HBase indicates that this
+         * scanner might not see the new rows. So we manually query the daughter rows */
+        PairOfSameType<HRegionInfo> daughters = HRegionInfo.getDaughterRegions(rowResult);
+        HRegionInfo splitA = daughters.getFirst();
+        HRegionInfo splitB = daughters.getSecond();
+
+        HTable metaTable = getMetaTable();
+        long start = System.currentTimeMillis();
+        Result resultA = getRegionResultBlocking(metaTable, blockingTimeout,
+            splitA.getRegionName());
+        if (resultA != null) {
+          processRow(resultA);
+          daughterRegions.add(splitA.getRegionName());
+        } else {
+          throw new RegionOfflineException("Split daughter region " +
+              splitA.getRegionNameAsString() + " cannot be found in META.");
+        }
+        long rem = blockingTimeout - (System.currentTimeMillis() - start);
+
+        Result resultB = getRegionResultBlocking(metaTable, rem,
+            splitB.getRegionName());
+        if (resultB != null) {
+          processRow(resultB);
+          daughterRegions.add(splitB.getRegionName());
+        } else {
+          throw new RegionOfflineException("Split daughter region " +
+              splitB.getRegionNameAsString() + " cannot be found in META.");
+        }
+      }
+
+      return processRowInternal(rowResult);
+    }
+
+    private Result getRegionResultBlocking(HTable metaTable, long timeout, byte[] regionName)
+        throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("blocking until region is in META: " + Bytes.toStringBinary(regionName));
+      }
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < timeout) {
+        Get get = new Get(regionName);
+        Result result = metaTable.get(get);
+        HRegionInfo info = getHRegionInfo(result);
+        if (info != null) {
+          return result;
+        }
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+      return null;
+    }
+  }
+
+  /**
+   * A MetaScannerVisitor for a table. Provides a consistent view of the table's
+   * META entries during concurrent splits (see HBASE-5986 for details). This class
+   * does not guarantee ordered traversal of meta entries, and can block until the
+   * META entries for daughters are available during splits.
+   */
+  public static abstract class TableMetaScannerVisitor extends BlockingMetaScannerVisitor {
+    private byte[] tableName;
+
+    public TableMetaScannerVisitor(Configuration conf, byte[] tableName) {
+      super(conf);
+      this.tableName = tableName;
+    }
+
+    @Override
+    public final boolean processRow(Result rowResult) throws IOException {
+      HRegionInfo info = getHRegionInfo(rowResult);
+      if (info == null) {
+        return true;
+      }
+      if (!(Bytes.equals(info.getTableName(), tableName))) {
+        return false;
+      }
+      return super.processRow(rowResult);
+    }
+
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Container for Actions (i.e. Get, Delete, or Put), which are grouped by
+ * regionName. Intended to be used with HConnectionManager.processBatch()
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class MultiAction<R> {
+
+  // map of regions to lists of puts/gets/deletes for that region.
+  public Map<byte[], List<Action<R>>> actions = new TreeMap<byte[], List<Action<R>>>(Bytes.BYTES_COMPARATOR);
+
+  public MultiAction() {
+    super();
+  }
+
+  /**
+   * Get the total number of Actions
+   *
+   * @return total number of Actions for all groups in this container.
+   */
+  public int size() {
+    int size = 0;
+    for (List<?> l : actions.values()) {
+      size += l.size();
+    }
+    return size;
+  }
+
+  /**
+   * Add an Action to this container based on it's regionName. If the regionName
+   * is wrong, the initial execution will fail, but will be automatically
+   * retried after looking up the correct region.
+   *
+   * @param regionName
+   * @param a
+   */
+  public void add(byte[] regionName, Action<R> a) {
+    List<Action<R>> rsActions = actions.get(regionName);
+    if (rsActions == null) {
+      rsActions = new ArrayList<Action<R>>();
+      actions.put(regionName, rsActions);
+    }
+    rsActions.add(a);
+  }
+
+  public Set<byte[]> getRegions() {
+    return actions.keySet();
+  }
+
+  /**
+   * @return All actions from all regions in this container
+   */
+  public List<Action<R>> allActions() {
+    List<Action<R>> res = new ArrayList<Action<R>>();
+    for (List<Action<R>> lst : actions.values()) {
+      res.addAll(lst);
+    }
+    return res;
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A container for Result objects, grouped by regionName.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MultiResponse {
+
+  // map of regionName to list of (Results paired to the original index for that
+  // Result)
+  private Map<byte[], List<Pair<Integer, Object>>> results =
+      new TreeMap<byte[], List<Pair<Integer, Object>>>(Bytes.BYTES_COMPARATOR);
+
+  public MultiResponse() {
+    super();
+  }
+
+  /**
+   * @return Number of pairs in this container
+   */
+  public int size() {
+    int size = 0;
+    for (Collection<?> c : results.values()) {
+      size += c.size();
+    }
+    return size;
+  }
+
+  /**
+   * Add the pair to the container, grouped by the regionName
+   *
+   * @param regionName
+   * @param r
+   *          First item in the pair is the original index of the Action
+   *          (request). Second item is the Result. Result will be empty for
+   *          successful Put and Delete actions.
+   */
+  public void add(byte[] regionName, Pair<Integer, Object> r) {
+    List<Pair<Integer, Object>> rs = results.get(regionName);
+    if (rs == null) {
+      rs = new ArrayList<Pair<Integer, Object>>();
+      results.put(regionName, rs);
+    }
+    rs.add(r);
+  }
+
+  public void add(byte []regionName, int originalIndex, Object resOrEx) {
+    add(regionName, new Pair<Integer,Object>(originalIndex, resOrEx));
+  }
+
+  public Map<byte[], List<Pair<Integer, Object>>> getResults() {
+    return results;
+  }
+}
\ No newline at end of file



Mime
View raw message