Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 135C6E615 for ; Mon, 25 Feb 2013 22:51:56 +0000 (UTC) Received: (qmail 23423 invoked by uid 500); 25 Feb 2013 22:51:55 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 23350 invoked by uid 500); 25 Feb 2013 22:51:55 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23159 invoked by uid 99); 25 Feb 2013 22:51:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Feb 2013 22:51:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Feb 2013 22:51:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D043D2388C4E; Mon, 25 Feb 2013 22:50:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1449950 [11/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ h... Date: Mon, 25 Feb 2013 22:50:29 -0000 To: commits@hbase.apache.org From: eclark@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130225225053.D043D2388C4E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,561 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables. + * Each put will be sharded into different buffer queues based on its destination region server. + * So each region server buffer queue will only have the puts which share the same destination. + * And each queue will have a flush worker thread to flush the puts request to the region server. + * If any queue is full, the HTableMultiplexer starts to drop the Put requests for that + * particular queue. + * + * Also all the puts will be retried as a configuration number before dropping. + * And the HTableMultiplexer can report the number of buffered requests and the number of the + * failed (dropped) requests in total or on per region server basis. + * + * This class is thread safe. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class HTableMultiplexer { + private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName()); + private static int poolID = 0; + + static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms"; + + private Map tableNameToHTableMap; + + /** The map between each region server to its corresponding buffer queue */ + private Map> + serverToBufferQueueMap; + + /** The map between each region server to its flush worker */ + private Map serverToFlushWorkerMap; + + private Configuration conf; + private int retryNum; + private int perRegionServerBufferQueueSize; + + /** + * + * @param conf The HBaseConfiguration + * @param perRegionServerBufferQueueSize determines the max number of the buffered Put ops + * for each region server before dropping the request. + */ + public HTableMultiplexer(Configuration conf, + int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException { + this.conf = conf; + this.serverToBufferQueueMap = new ConcurrentHashMap>(); + this.serverToFlushWorkerMap = new ConcurrentHashMap(); + this.tableNameToHTableMap = new ConcurrentHashMap(); + this.retryNum = conf.getInt("hbase.client.retries.number", 10); + this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize; + } + + /** + * The put request will be buffered by its corresponding buffer queue. Return false if the queue + * is already full. + * @param table + * @param put + * @return true if the request can be accepted by its corresponding buffer queue. + * @throws IOException + */ + public boolean put(final byte[] table, final Put put) throws IOException { + return put(table, put, this.retryNum); + } + + /** + * The puts request will be buffered by their corresponding buffer queue. + * Return the list of puts which could not be queued. + * @param table + * @param puts + * @return the list of puts which could not be queued + * @throws IOException + */ + public List put(final byte[] table, final List puts) + throws IOException { + if (puts == null) + return null; + + List failedPuts = null; + boolean result; + for (Put put : puts) { + result = put(table, put, this.retryNum); + if (result == false) { + + // Create the failed puts list if necessary + if (failedPuts == null) { + failedPuts = new ArrayList(); + } + // Add the put to the failed puts list + failedPuts.add(put); + } + } + return failedPuts; + } + + /** + * The put request will be buffered by its corresponding buffer queue. And the put request will be + * retried before dropping the request. + * Return false if the queue is already full. + * @param table + * @param put + * @param retry + * @return true if the request can be accepted by its corresponding buffer queue. + * @throws IOException + */ + public boolean put(final byte[] table, final Put put, int retry) + throws IOException { + if (retry <= 0) { + return false; + } + + LinkedBlockingQueue queue; + HTable htable = getHTable(table); + try { + htable.validatePut(put); + HRegionLocation loc = htable.getRegionLocation(put.getRow(), false); + if (loc != null) { + // Add the put pair into its corresponding queue. + queue = addNewRegionServer(loc, htable); + // Generate a MultiPutStatus obj and offer it into the queue + PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry); + + return queue.offer(s); + } + } catch (Exception e) { + LOG.debug("Cannot process the put " + put + " because of " + e); + } + return false; + } + + /** + * @return the current HTableMultiplexerStatus + */ + public HTableMultiplexerStatus getHTableMultiplexerStatus() { + return new HTableMultiplexerStatus(serverToFlushWorkerMap); + } + + + private HTable getHTable(final byte[] table) throws IOException { + HTable htable = this.tableNameToHTableMap.get(table); + if (htable == null) { + synchronized (this.tableNameToHTableMap) { + htable = this.tableNameToHTableMap.get(table); + if (htable == null) { + htable = new HTable(conf, table); + this.tableNameToHTableMap.put(table, htable); + } + } + } + return htable; + } + + private synchronized LinkedBlockingQueue addNewRegionServer( + HRegionLocation addr, HTable htable) { + LinkedBlockingQueue queue = + serverToBufferQueueMap.get(addr); + if (queue == null) { + // Create a queue for the new region server + queue = new LinkedBlockingQueue(perRegionServerBufferQueueSize); + serverToBufferQueueMap.put(addr, queue); + + // Create the flush worker + HTableFlushWorker worker = new HTableFlushWorker(conf, addr, + this, queue, htable); + this.serverToFlushWorkerMap.put(addr, worker); + + // Launch a daemon thread to flush the puts + // from the queue to its corresponding region server. + String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-" + + (poolID++); + Thread t = new Thread(worker, name); + t.setDaemon(true); + t.start(); + } + return queue; + } + + /** + * HTableMultiplexerStatus keeps track of the current status of the HTableMultiplexer. + * report the number of buffered requests and the number of the failed (dropped) requests + * in total or on per region server basis. + */ + static class HTableMultiplexerStatus { + private long totalFailedPutCounter; + private long totalBufferedPutCounter; + private long maxLatency; + private long overallAverageLatency; + private Map serverToFailedCounterMap; + private Map serverToBufferedCounterMap; + private Map serverToAverageLatencyMap; + private Map serverToMaxLatencyMap; + + public HTableMultiplexerStatus( + Map serverToFlushWorkerMap) { + this.totalBufferedPutCounter = 0; + this.totalFailedPutCounter = 0; + this.maxLatency = 0; + this.overallAverageLatency = 0; + this.serverToBufferedCounterMap = new HashMap(); + this.serverToFailedCounterMap = new HashMap(); + this.serverToAverageLatencyMap = new HashMap(); + this.serverToMaxLatencyMap = new HashMap(); + this.initialize(serverToFlushWorkerMap); + } + + private void initialize( + Map serverToFlushWorkerMap) { + if (serverToFlushWorkerMap == null) { + return; + } + + long averageCalcSum = 0; + int averageCalcCount = 0; + for (Map.Entry entry : serverToFlushWorkerMap + .entrySet()) { + HRegionLocation addr = entry.getKey(); + HTableFlushWorker worker = entry.getValue(); + + long bufferedCounter = worker.getTotalBufferedCount(); + long failedCounter = worker.getTotalFailedCount(); + long serverMaxLatency = worker.getMaxLatency(); + AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter(); + // Get sum and count pieces separately to compute overall average + SimpleEntry averageComponents = averageCounter + .getComponents(); + long serverAvgLatency = averageCounter.getAndReset(); + + this.totalBufferedPutCounter += bufferedCounter; + this.totalFailedPutCounter += failedCounter; + if (serverMaxLatency > this.maxLatency) { + this.maxLatency = serverMaxLatency; + } + averageCalcSum += averageComponents.getKey(); + averageCalcCount += averageComponents.getValue(); + + this.serverToBufferedCounterMap.put(addr.getHostnamePort(), + bufferedCounter); + this.serverToFailedCounterMap + .put(addr.getHostnamePort(), + failedCounter); + this.serverToAverageLatencyMap.put(addr.getHostnamePort(), + serverAvgLatency); + this.serverToMaxLatencyMap + .put(addr.getHostnamePort(), + serverMaxLatency); + } + this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum + / averageCalcCount : 0; + } + + public long getTotalBufferedCounter() { + return this.totalBufferedPutCounter; + } + + public long getTotalFailedCounter() { + return this.totalFailedPutCounter; + } + + public long getMaxLatency() { + return this.maxLatency; + } + + public long getOverallAverageLatency() { + return this.overallAverageLatency; + } + + public Map getBufferedCounterForEachRegionServer() { + return this.serverToBufferedCounterMap; + } + + public Map getFailedCounterForEachRegionServer() { + return this.serverToFailedCounterMap; + } + + public Map getMaxLatencyForEachRegionServer() { + return this.serverToMaxLatencyMap; + } + + public Map getAverageLatencyForEachRegionServer() { + return this.serverToAverageLatencyMap; + } + } + + private static class PutStatus { + private final HRegionInfo regionInfo; + private final Put put; + private final int retryCount; + public PutStatus(final HRegionInfo regionInfo, final Put put, + final int retryCount) { + this.regionInfo = regionInfo; + this.put = put; + this.retryCount = retryCount; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + public Put getPut() { + return put; + } + public int getRetryCount() { + return retryCount; + } + } + + /** + * Helper to count the average over an interval until reset. + */ + private static class AtomicAverageCounter { + private long sum; + private int count; + + public AtomicAverageCounter() { + this.sum = 0L; + this.count = 0; + } + + public synchronized long getAndReset() { + long result = this.get(); + this.reset(); + return result; + } + + public synchronized long get() { + if (this.count == 0) { + return 0; + } + return this.sum / this.count; + } + + public synchronized SimpleEntry getComponents() { + return new SimpleEntry(sum, count); + } + + public synchronized void reset() { + this.sum = 0l; + this.count = 0; + } + + public synchronized void add(long value) { + this.sum += value; + this.count++; + } + } + + private static class HTableFlushWorker implements Runnable { + private HRegionLocation addr; + private Configuration conf; + private LinkedBlockingQueue queue; + private HTableMultiplexer htableMultiplexer; + private AtomicLong totalFailedPutCount; + private AtomicInteger currentProcessingPutCount; + private AtomicAverageCounter averageLatency; + private AtomicLong maxLatency; + private HTable htable; // For Multi + + public HTableFlushWorker(Configuration conf, HRegionLocation addr, + HTableMultiplexer htableMultiplexer, + LinkedBlockingQueue queue, HTable htable) { + this.addr = addr; + this.conf = conf; + this.htableMultiplexer = htableMultiplexer; + this.queue = queue; + this.totalFailedPutCount = new AtomicLong(0); + this.currentProcessingPutCount = new AtomicInteger(0); + this.averageLatency = new AtomicAverageCounter(); + this.maxLatency = new AtomicLong(0); + this.htable = htable; + } + + public long getTotalFailedCount() { + return totalFailedPutCount.get(); + } + + public long getTotalBufferedCount() { + return queue.size() + currentProcessingPutCount.get(); + } + + public AtomicAverageCounter getAverageLatencyCounter() { + return this.averageLatency; + } + + public long getMaxLatency() { + return this.maxLatency.getAndSet(0); + } + + private boolean resubmitFailedPut(PutStatus failedPutStatus, + HRegionLocation oldLoc) throws IOException { + Put failedPut = failedPutStatus.getPut(); + // The currentPut is failed. So get the table name for the currentPut. + byte[] tableName = failedPutStatus.getRegionInfo().getTableName(); + // Decrease the retry count + int retryCount = failedPutStatus.getRetryCount() - 1; + + if (retryCount <= 0) { + // Update the failed counter and no retry any more. + return false; + } else { + // Retry one more time + return this.htableMultiplexer.put(tableName, failedPut, retryCount); + } + } + + @Override + public void run() { + List processingList = new ArrayList(); + /** + * The frequency in milliseconds for the current thread to process the corresponding + * buffer queue. + **/ + long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100); + + // initial delay + try { + Thread.sleep(frequency); + } catch (InterruptedException e) { + } // Ignore + + long start, elapsed; + int failedCount = 0; + while (true) { + try { + start = elapsed = EnvironmentEdgeManager.currentTimeMillis(); + + // Clear the processingList, putToStatusMap and failedCount + processingList.clear(); + failedCount = 0; + + // drain all the queued puts into the tmp list + queue.drainTo(processingList); + currentProcessingPutCount.set(processingList.size()); + + if (processingList.size() > 0) { + ArrayList list = new ArrayList(processingList.size()); + for (PutStatus putStatus: processingList) { + list.add(putStatus.getPut()); + } + + // Process this multiput request + List failed = null; + Object[] results = new Object[list.size()]; + try { + htable.batch(list, results); + } catch (IOException e) { + LOG.debug("Caught some exceptions " + e + + " when flushing puts to region server " + addr.getHostnamePort()); + } finally { + // mutate list so that it is empty for complete success, or + // contains only failed records + // results are returned in the same order as the requests in list + // walk the list backwards, so we can remove from list without + // impacting the indexes of earlier members + for (int i = results.length - 1; i >= 0; i--) { + if (results[i] instanceof Result) { + // successful Puts are removed from the list here. + list.remove(i); + } + } + failed = list; + } + + if (failed != null) { + if (failed.size() == processingList.size()) { + // All the puts for this region server are failed. Going to retry it later + for (PutStatus putStatus: processingList) { + if (!resubmitFailedPut(putStatus, this.addr)) { + failedCount++; + } + } + } else { + Set failedPutSet = new HashSet(failed); + for (PutStatus putStatus: processingList) { + if (failedPutSet.contains(putStatus.getPut()) + && !resubmitFailedPut(putStatus, this.addr)) { + failedCount++; + } + } + } + } + // Update the totalFailedCount + this.totalFailedPutCount.addAndGet(failedCount); + + elapsed = EnvironmentEdgeManager.currentTimeMillis() - start; + // Update latency counters + averageLatency.add(elapsed); + if (elapsed > maxLatency.get()) { + maxLatency.set(elapsed); + } + + // Log some basic info + if (LOG.isDebugEnabled()) { + LOG.debug("Processed " + currentProcessingPutCount + + " put requests for " + addr.getHostnamePort() + " and " + + failedCount + " failed" + ", latency for this send: " + + elapsed); + } + + // Reset the current processing put count + currentProcessingPutCount.set(0); + } + + // Sleep for a while + if (elapsed == start) { + elapsed = EnvironmentEdgeManager.currentTimeMillis() - start; + } + if (elapsed < frequency) { + Thread.sleep(frequency - elapsed); + } + } catch (Exception e) { + // Log all the exceptions and move on + LOG.debug("Caught some exceptions " + e + + " when flushing puts to region server " + + addr.getHostnamePort()); + } + } + } + } +} Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,547 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * A simple pool of HTable instances. + * + * Each HTablePool acts as a pool for all tables. To use, instantiate an + * HTablePool and use {@link #getTable(String)} to get an HTable from the pool. + * + * This method is not needed anymore, clients should call + * HTableInterface.close() rather than returning the tables to the pool + * + * Once you are done with it, close your instance of {@link HTableInterface} + * by calling {@link HTableInterface#close()} rather than returning the tables + * to the pool with (deprecated) {@link #putTable(HTableInterface)}. + * + *

+ * A pool can be created with a maxSize which defines the most HTable + * references that will ever be retained for each table. Otherwise the default + * is {@link Integer#MAX_VALUE}. + * + *

+ * Pool will manage its own connections to the cluster. See + * {@link HConnectionManager}. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class HTablePool implements Closeable { + private final PoolMap tables; + private final int maxSize; + private final PoolType poolType; + private final Configuration config; + private final HTableInterfaceFactory tableFactory; + + /** + * Default Constructor. Default HBaseConfiguration and no limit on pool size. + */ + public HTablePool() { + this(HBaseConfiguration.create(), Integer.MAX_VALUE); + } + + /** + * Constructor to set maximum versions and use the specified configuration. + * + * @param config + * configuration + * @param maxSize + * maximum number of references to keep for each table + */ + public HTablePool(final Configuration config, final int maxSize) { + this(config, maxSize, null, null); + } + + /** + * Constructor to set maximum versions and use the specified configuration and + * table factory. + * + * @param config + * configuration + * @param maxSize + * maximum number of references to keep for each table + * @param tableFactory + * table factory + */ + public HTablePool(final Configuration config, final int maxSize, + final HTableInterfaceFactory tableFactory) { + this(config, maxSize, tableFactory, PoolType.Reusable); + } + + /** + * Constructor to set maximum versions and use the specified configuration and + * pool type. + * + * @param config + * configuration + * @param maxSize + * maximum number of references to keep for each table + * @param poolType + * pool type which is one of {@link PoolType#Reusable} or + * {@link PoolType#ThreadLocal} + */ + public HTablePool(final Configuration config, final int maxSize, + final PoolType poolType) { + this(config, maxSize, null, poolType); + } + + /** + * Constructor to set maximum versions and use the specified configuration, + * table factory and pool type. The HTablePool supports the + * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool + * type is null or not one of those two values, then it will default to + * {@link PoolType#Reusable}. + * + * @param config + * configuration + * @param maxSize + * maximum number of references to keep for each table + * @param tableFactory + * table factory + * @param poolType + * pool type which is one of {@link PoolType#Reusable} or + * {@link PoolType#ThreadLocal} + */ + public HTablePool(final Configuration config, final int maxSize, + final HTableInterfaceFactory tableFactory, PoolType poolType) { + // Make a new configuration instance so I can safely cleanup when + // done with the pool. + this.config = config == null ? HBaseConfiguration.create() : config; + this.maxSize = maxSize; + this.tableFactory = tableFactory == null ? new HTableFactory() + : tableFactory; + if (poolType == null) { + this.poolType = PoolType.Reusable; + } else { + switch (poolType) { + case Reusable: + case ThreadLocal: + this.poolType = poolType; + break; + default: + this.poolType = PoolType.Reusable; + break; + } + } + this.tables = new PoolMap(this.poolType, + this.maxSize); + } + + /** + * Get a reference to the specified table from the pool. + *

+ *

+ * + * @param tableName + * table name + * @return a reference to the specified table + * @throws RuntimeException + * if there is a problem instantiating the HTable + */ + public HTableInterface getTable(String tableName) { + // call the old getTable implementation renamed to findOrCreateTable + HTableInterface table = findOrCreateTable(tableName); + // return a proxy table so when user closes the proxy, the actual table + // will be returned to the pool + return new PooledHTable(table); + } + + /** + * Get a reference to the specified table from the pool. + *

+ * + * Create a new one if one is not available. + * + * @param tableName + * table name + * @return a reference to the specified table + * @throws RuntimeException + * if there is a problem instantiating the HTable + */ + private HTableInterface findOrCreateTable(String tableName) { + HTableInterface table = tables.get(tableName); + if (table == null) { + table = createHTable(tableName); + } + return table; + } + + /** + * Get a reference to the specified table from the pool. + *

+ * + * Create a new one if one is not available. + * + * @param tableName + * table name + * @return a reference to the specified table + * @throws RuntimeException + * if there is a problem instantiating the HTable + */ + public HTableInterface getTable(byte[] tableName) { + return getTable(Bytes.toString(tableName)); + } + + /** + * This method is not needed anymore, clients should call + * HTableInterface.close() rather than returning the tables to the pool + * + * @param table + * the proxy table user got from pool + * @deprecated + */ + public void putTable(HTableInterface table) throws IOException { + // we need to be sure nobody puts a proxy implementation in the pool + // but if the client code is not updated + // and it will continue to call putTable() instead of calling close() + // then we need to return the wrapped table to the pool instead of the + // proxy + // table + if (table instanceof PooledHTable) { + returnTable(((PooledHTable) table).getWrappedTable()); + } else { + // normally this should not happen if clients pass back the same + // table + // object they got from the pool + // but if it happens then it's better to reject it + throw new IllegalArgumentException("not a pooled table: " + table); + } + } + + /** + * Puts the specified HTable back into the pool. + *

+ * + * If the pool already contains maxSize references to the table, then + * the table instance gets closed after flushing buffered edits. + * + * @param table + * table + */ + private void returnTable(HTableInterface table) throws IOException { + // this is the old putTable method renamed and made private + String tableName = Bytes.toString(table.getTableName()); + if (tables.size(tableName) >= maxSize) { + // release table instance since we're not reusing it + this.tables.remove(tableName, table); + this.tableFactory.releaseHTableInterface(table); + return; + } + tables.put(tableName, table); + } + + protected HTableInterface createHTable(String tableName) { + return this.tableFactory.createHTableInterface(config, + Bytes.toBytes(tableName)); + } + + /** + * Closes all the HTable instances , belonging to the given table, in the + * table pool. + *

+ * Note: this is a 'shutdown' of the given table pool and different from + * {@link #putTable(HTableInterface)}, that is used to return the table + * instance to the pool for future re-use. + * + * @param tableName + */ + public void closeTablePool(final String tableName) throws IOException { + Collection tables = this.tables.values(tableName); + if (tables != null) { + for (HTableInterface table : tables) { + this.tableFactory.releaseHTableInterface(table); + } + } + this.tables.remove(tableName); + } + + /** + * See {@link #closeTablePool(String)}. + * + * @param tableName + */ + public void closeTablePool(final byte[] tableName) throws IOException { + closeTablePool(Bytes.toString(tableName)); + } + + /** + * Closes all the HTable instances , belonging to all tables in the table + * pool. + *

+ * Note: this is a 'shutdown' of all the table pools. + */ + public void close() throws IOException { + for (String tableName : tables.keySet()) { + closeTablePool(tableName); + } + this.tables.clear(); + } + + public int getCurrentPoolSize(String tableName) { + return tables.size(tableName); + } + + /** + * A proxy class that implements HTableInterface.close method to return the + * wrapped table back to the table pool + * + */ + class PooledHTable implements HTableInterface { + + private HTableInterface table; // actual table implementation + + public PooledHTable(HTableInterface table) { + this.table = table; + } + + @Override + public byte[] getTableName() { + return table.getTableName(); + } + + @Override + public Configuration getConfiguration() { + return table.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return table.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return table.exists(get); + } + + @Override + public Boolean[] exists(List gets) throws IOException { + return table.exists(gets); + } + + @Override + public void batch(List actions, Object[] results) throws IOException, + InterruptedException { + table.batch(actions, results); + } + + @Override + public Object[] batch(List actions) throws IOException, + InterruptedException { + return table.batch(actions); + } + + @Override + public Result get(Get get) throws IOException { + return table.get(get); + } + + @Override + public Result[] get(List gets) throws IOException { + return table.get(gets); + } + + @Override + @SuppressWarnings("deprecation") + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return table.getRowOrBefore(row, family); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return table.getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return table.getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + return table.getScanner(family, qualifier); + } + + @Override + public void put(Put put) throws IOException { + table.put(put); + } + + @Override + public void put(List puts) throws IOException { + table.put(puts); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + return table.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public void delete(Delete delete) throws IOException { + table.delete(delete); + } + + @Override + public void delete(List deletes) throws IOException { + table.delete(deletes); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + return table.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public Result increment(Increment increment) throws IOException { + return table.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + return table.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) throws IOException { + return table.incrementColumnValue(row, family, qualifier, amount, + writeToWAL); + } + + @Override + public boolean isAutoFlush() { + return table.isAutoFlush(); + } + + @Override + public void flushCommits() throws IOException { + table.flushCommits(); + } + + /** + * Returns the actual table back to the pool + * + * @throws IOException + */ + public void close() throws IOException { + returnTable(table); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return table.coprocessorService(row); + } + + @Override + public Map coprocessorService(Class service, + byte[] startKey, byte[] endKey, Batch.Call callable) + throws ServiceException, Throwable { + return table.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public void coprocessorService(Class service, + byte[] startKey, byte[] endKey, Batch.Call callable, Callback callback) + throws ServiceException, Throwable { + table.coprocessorService(service, startKey, endKey, callable, callback); + } + + @Override + public String toString() { + return "PooledHTable{" + ", table=" + table + '}'; + } + + /** + * Expose the wrapped HTable to tests in the same package + * + * @return wrapped htable + */ + HTableInterface getWrappedTable() { + return table; + } + + @Override + public void batchCallback(List actions, + Object[] results, Callback callback) throws IOException, + InterruptedException { + table.batchCallback(actions, results, callback); + } + + @Override + public Object[] batchCallback(List actions, + Callback callback) throws IOException, InterruptedException { + return table.batchCallback(actions, callback); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + table.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return table.append(append); + } + + @Override + public void setAutoFlush(boolean autoFlush) { + table.setAutoFlush(autoFlush); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + table.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public long getWriteBufferSize() { + return table.getWriteBufferSize(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + table.setWriteBufferSize(writeBufferSize); + } + } +} Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.HRegionLocation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility class for HTable. + * + * + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class HTableUtil { + + private static final int INITIAL_LIST_SIZE = 250; + + /** + * Processes a List of Puts and writes them to an HTable instance in RegionServer buckets via the htable.put method. + * This will utilize the writeBuffer, thus the writeBuffer flush frequency may be tuned accordingly via htable.setWriteBufferSize. + *

+ * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs in each flush. + *

+ * Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region, + * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this. + *
+ * Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem. + *
+ * Assumption #3: That the input list of Puts is big enough to be useful (in the thousands or more). The intent of this + * method is to process larger chunks of data. + *
+ * Assumption #4: htable.setAutoFlush(false) has been set. This is a requirement to use the writeBuffer. + *

+ * @param htable HTable instance for target HBase table + * @param puts List of Put instances + * @throws IOException if a remote or network exception occurs + * + */ + public static void bucketRsPut(HTable htable, List puts) throws IOException { + + Map> putMap = createRsPutMap(htable, puts); + for (List rsPuts: putMap.values()) { + htable.put( rsPuts ); + } + htable.flushCommits(); + } + + /** + * Processes a List of Rows (Put, Delete) and writes them to an HTable instance in RegionServer buckets via the htable.batch method. + *

+ * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs, thus this will + * produce one RPC of Puts per RegionServer. + *

+ * Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region, + * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this. + *
+ * Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem. + *
+ * Assumption #3: That the input list of Rows is big enough to be useful (in the thousands or more). The intent of this + * method is to process larger chunks of data. + *

+ * This method accepts a list of Row objects because the underlying .batch method accepts a list of Row objects. + *

+ * @param htable HTable instance for target HBase table + * @param rows List of Row instances + * @throws IOException if a remote or network exception occurs + */ + public static void bucketRsBatch(HTable htable, List rows) throws IOException { + + try { + Map> rowMap = createRsRowMap(htable, rows); + for (List rsRows: rowMap.values()) { + htable.batch( rsRows ); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + + } + + private static Map> createRsPutMap(HTable htable, List puts) throws IOException { + + Map> putMap = new HashMap>(); + for (Put put: puts) { + HRegionLocation rl = htable.getRegionLocation( put.getRow() ); + String hostname = rl.getHostname(); + List recs = putMap.get( hostname); + if (recs == null) { + recs = new ArrayList(INITIAL_LIST_SIZE); + putMap.put( hostname, recs); + } + recs.add(put); + } + return putMap; + } + + private static Map> createRsRowMap(HTable htable, List rows) throws IOException { + + Map> rowMap = new HashMap>(); + for (Row row: rows) { + HRegionLocation rl = htable.getRegionLocation( row.getRow() ); + String hostname = rl.getHostname(); + List recs = rowMap.get( hostname); + if (recs == null) { + recs = new ArrayList(INITIAL_LIST_SIZE); + rowMap.put( hostname, recs); + } + recs.add(row); + } + return rowMap; + } + +} Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,258 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; + +/** + * Used to perform Increment operations on a single row. + *

+ * This operation does not appear atomic to readers. Increments are done + * under a single row lock, so write operations to a row are synchronized, but + * readers do not take row locks so get and scan operations can see this + * operation partially completed. + *

+ * To increment columns of a row, instantiate an Increment object with the row + * to increment. At least one column to increment must be specified using the + * {@link #addColumn(byte[], byte[], long)} method. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class Increment implements Row { + private byte [] row = null; + private boolean writeToWAL = true; + private TimeRange tr = new TimeRange(); + private Map> familyMap = + new TreeMap>(Bytes.BYTES_COMPARATOR); + + /** Constructor for Writable. DO NOT USE */ + public Increment() {} + + /** + * Create a Increment operation for the specified row, using an existing row + * lock. + *

+ * At least one column must be incremented. + * @param row row key + */ + public Increment(byte [] row) { + if (row == null) { + throw new IllegalArgumentException("Cannot increment a null row"); + } + this.row = row; + } + + /** + * Increment the column from the specific family with the specified qualifier + * by the specified amount. + *

+ * Overrides previous calls to addColumn for this family and qualifier. + * @param family family name + * @param qualifier column qualifier + * @param amount amount to increment by + * @return the Increment object + */ + public Increment addColumn(byte [] family, byte [] qualifier, long amount) { + if (family == null) { + throw new IllegalArgumentException("family cannot be null"); + } + if (qualifier == null) { + throw new IllegalArgumentException("qualifier cannot be null"); + } + NavigableMap set = familyMap.get(family); + if(set == null) { + set = new TreeMap(Bytes.BYTES_COMPARATOR); + } + set.put(qualifier, amount); + familyMap.put(family, set); + return this; + } + + /* Accessors */ + + /** + * Method for retrieving the increment's row + * @return row + */ + public byte [] getRow() { + return this.row; + } + + /** + * Method for retrieving whether WAL will be written to or not + * @return true if WAL should be used, false if not + */ + public boolean getWriteToWAL() { + return this.writeToWAL; + } + + /** + * Sets whether this operation should write to the WAL or not. + * @param writeToWAL true if WAL should be used, false if not + * @return this increment operation + */ + public Increment setWriteToWAL(boolean writeToWAL) { + this.writeToWAL = writeToWAL; + return this; + } + + /** + * Gets the TimeRange used for this increment. + * @return TimeRange + */ + public TimeRange getTimeRange() { + return this.tr; + } + + /** + * Sets the TimeRange to be used on the Get for this increment. + *

+ * This is useful for when you have counters that only last for specific + * periods of time (ie. counters that are partitioned by time). By setting + * the range of valid times for this increment, you can potentially gain + * some performance with a more optimal Get operation. + *

+ * This range is used as [minStamp, maxStamp). + * @param minStamp minimum timestamp value, inclusive + * @param maxStamp maximum timestamp value, exclusive + * @throws IOException if invalid time range + * @return this + */ + public Increment setTimeRange(long minStamp, long maxStamp) + throws IOException { + tr = new TimeRange(minStamp, maxStamp); + return this; + } + + /** + * Method for retrieving the keys in the familyMap + * @return keys in the current familyMap + */ + public Set familySet() { + return this.familyMap.keySet(); + } + + /** + * Method for retrieving the number of families to increment from + * @return number of families + */ + public int numFamilies() { + return this.familyMap.size(); + } + + /** + * Method for retrieving the number of columns to increment + * @return number of columns across all families + */ + public int numColumns() { + if (!hasFamilies()) return 0; + int num = 0; + for (NavigableMap family : familyMap.values()) { + num += family.size(); + } + return num; + } + + /** + * Method for checking if any families have been inserted into this Increment + * @return true if familyMap is non empty false otherwise + */ + public boolean hasFamilies() { + return !this.familyMap.isEmpty(); + } + + /** + * Method for retrieving the increment's familyMap + * @return familyMap + */ + public Map> getFamilyMap() { + return this.familyMap; + } + + /** + * @return String + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("row="); + sb.append(Bytes.toStringBinary(this.row)); + if(this.familyMap.size() == 0) { + sb.append(", no columns set to be incremented"); + return sb.toString(); + } + sb.append(", families="); + boolean moreThanOne = false; + for(Map.Entry> entry : + this.familyMap.entrySet()) { + if(moreThanOne) { + sb.append("), "); + } else { + moreThanOne = true; + sb.append("{"); + } + sb.append("(family="); + sb.append(Bytes.toString(entry.getKey())); + sb.append(", columns="); + if(entry.getValue() == null) { + sb.append("NONE"); + } else { + sb.append("{"); + boolean moreThanOneB = false; + for(Map.Entry column : entry.getValue().entrySet()) { + if(moreThanOneB) { + sb.append(", "); + } else { + moreThanOneB = true; + } + sb.append(Bytes.toStringBinary(column.getKey()) + "+=" + column.getValue()); + } + sb.append("}"); + } + } + sb.append("}"); + return sb.toString(); + } + + @Override + public int compareTo(Row i) { + return Bytes.compareTo(this.getRow(), i.getRow()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Row other = (Row) obj; + return compareTo(other) == 0; + } +} Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/IsolationLevel.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,59 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Specify Isolation levels in Scan operations. + *

+ * There are two isolation levels. A READ_COMMITTED isolation level + * indicates that only data that is committed be returned in a scan. + * An isolation level of READ_UNCOMMITTED indicates that a scan + * should return data that is being modified by transactions that might + * not have been committed yet. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum IsolationLevel { + + READ_COMMITTED(1), + READ_UNCOMMITTED(2); + + IsolationLevel(int value) {} + + public byte [] toBytes() { + return new byte [] { toByte() }; + } + + public byte toByte() { + return (byte)this.ordinal(); + } + + public static IsolationLevel fromBytes(byte [] bytes) { + return IsolationLevel.fromByte(bytes[0]); + } + + public static IsolationLevel fromByte(byte vbyte) { + return IsolationLevel.values()[vbyte]; + } +} Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAdminKeepAliveConnection.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,44 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + + +import org.apache.hadoop.hbase.MasterAdminProtocol; + +import java.io.Closeable; + +/** + * A KeepAlive connection is not physically closed immediately after the close, + * but rather kept alive for a few minutes. It makes sense only if it's shared. + * + * This interface is used by a dynamic proxy. It allows to have a #close + * function in a master client. + * + * This class is intended to be used internally by HBase classes that need to + * speak the MasterAdminProtocol; but not by * final user code. Hence it's + * package protected. + */ +interface MasterAdminKeepAliveConnection extends MasterAdminProtocol, Closeable { + + @Override + public void close(); +} + Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterMonitorKeepAliveConnection.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,44 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + + +import org.apache.hadoop.hbase.MasterMonitorProtocol; + +import java.io.Closeable; + +/** + * A KeepAlive connection is not physically closed immediately after the close, + * but rather kept alive for a few minutes. It makes sense only if it's shared. + * + * This interface is used by a dynamic proxy. It allows to have a #close + * function in a master client. + * + * This class is intended to be used internally by HBase classes that need to + * speak the MasterMonitorProtocol; but not by final user code. Hence it's + * package protected. + */ +interface MasterMonitorKeepAliveConnection extends MasterMonitorProtocol, Closeable { + + @Override + public void close(); +} + Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,489 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; +import org.apache.hadoop.hbase.exceptions.TableNotFoundException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.PairOfSameType; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Scanner class that contains the .META. table scanning logic + * and uses a Retryable scanner. Provided visitors will be called + * for each row. + * + * Although public visibility, this is not a public-facing API and may evolve in + * minor releases. + * + *

Note that during concurrent region splits, the scanner might not see + * META changes across rows (for parent and daughter entries) consistently. + * see HBASE-5986, and {@link BlockingMetaScannerVisitor} for details.

+ */ +@InterfaceAudience.Private +public class MetaScanner { + private static final Log LOG = LogFactory.getLog(MetaScanner.class); + /** + * Scans the meta table and calls a visitor on each RowResult and uses a empty + * start row value as table name. + * + * @param configuration conf + * @param visitor A custom visitor + * @throws IOException e + */ + public static void metaScan(Configuration configuration, + MetaScannerVisitor visitor) + throws IOException { + metaScan(configuration, visitor, null); + } + + /** + * Scans the meta table and calls a visitor on each RowResult. Uses a table + * name to locate meta regions. + * + * @param configuration config + * @param visitor visitor object + * @param userTableName User table name in meta table to start scan at. Pass + * null if not interested in a particular table. + * @throws IOException e + */ + public static void metaScan(Configuration configuration, + MetaScannerVisitor visitor, byte [] userTableName) + throws IOException { + metaScan(configuration, visitor, userTableName, null, Integer.MAX_VALUE); + } + + /** + * Scans the meta table and calls a visitor on each RowResult. Uses a table + * name and a row name to locate meta regions. And it only scans at most + * rowLimit of rows. + * + * @param configuration HBase configuration. + * @param visitor Visitor object. + * @param userTableName User table name in meta table to start scan at. Pass + * null if not interested in a particular table. + * @param row Name of the row at the user table. The scan will start from + * the region row where the row resides. + * @param rowLimit Max of processed rows. If it is less than 0, it + * will be set to default value Integer.MAX_VALUE. + * @throws IOException e + */ + public static void metaScan(Configuration configuration, + MetaScannerVisitor visitor, byte [] userTableName, byte[] row, + int rowLimit) + throws IOException { + metaScan(configuration, visitor, userTableName, row, rowLimit, + HConstants.META_TABLE_NAME); + } + + /** + * Scans the meta table and calls a visitor on each RowResult. Uses a table + * name and a row name to locate meta regions. And it only scans at most + * rowLimit of rows. + * + * @param configuration HBase configuration. + * @param visitor Visitor object. Closes the visitor before returning. + * @param tableName User table name in meta table to start scan at. Pass + * null if not interested in a particular table. + * @param row Name of the row at the user table. The scan will start from + * the region row where the row resides. + * @param rowLimit Max of processed rows. If it is less than 0, it + * will be set to default value Integer.MAX_VALUE. + * @param metaTableName Meta table to scan, root or meta. + * @throws IOException e + */ + public static void metaScan(Configuration configuration, + final MetaScannerVisitor visitor, final byte[] tableName, + final byte[] row, final int rowLimit, final byte[] metaTableName) + throws IOException { + try { + HConnectionManager.execute(new HConnectable(configuration) { + @Override + public Void connect(HConnection connection) throws IOException { + metaScan(conf, connection, visitor, tableName, row, rowLimit, + metaTableName); + return null; + } + }); + } finally { + visitor.close(); + } + } + + private static void metaScan(Configuration configuration, HConnection connection, + MetaScannerVisitor visitor, byte [] tableName, byte[] row, + int rowLimit, final byte [] metaTableName) + throws IOException { + int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE; + + // if row is not null, we want to use the startKey of the row's region as + // the startRow for the meta scan. + byte[] startRow; + if (row != null) { + // Scan starting at a particular row in a particular table + assert tableName != null; + byte[] searchRow = + HRegionInfo.createRegionName(tableName, row, HConstants.NINES, + false); + HTable metaTable = null; + try { + metaTable = new HTable(configuration, HConstants.META_TABLE_NAME); + Result startRowResult = metaTable.getRowOrBefore(searchRow, + HConstants.CATALOG_FAMILY); + if (startRowResult == null) { + throw new TableNotFoundException("Cannot find row in .META. for table: " + + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow)); + } + HRegionInfo regionInfo = getHRegionInfo(startRowResult); + if (regionInfo == null) { + throw new IOException("HRegionInfo was null or empty in Meta for " + + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow)); + } + + byte[] rowBefore = regionInfo.getStartKey(); + startRow = HRegionInfo.createRegionName(tableName, rowBefore, + HConstants.ZEROES, false); + } finally { + if (metaTable != null) { + metaTable.close(); + } + } + } else if (tableName == null || tableName.length == 0) { + // Full META scan + startRow = HConstants.EMPTY_START_ROW; + } else { + // Scan META for an entire table + startRow = HRegionInfo.createRegionName( + tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false); + } + + // Scan over each meta region + ScannerCallable callable; + int rows = Math.min(rowLimit, configuration.getInt( + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_META_SCANNER_CACHING)); + do { + final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY); + if (LOG.isDebugEnabled()) { + LOG.debug("Scanning " + Bytes.toString(metaTableName) + + " starting at row=" + Bytes.toStringBinary(startRow) + " for max=" + + rowUpperLimit + " rows using " + connection.toString()); + } + callable = new ScannerCallable(connection, metaTableName, scan, null); + // Open scanner + callable.withRetries(); + + int processedRows = 0; + try { + callable.setCaching(rows); + done: do { + if (processedRows >= rowUpperLimit) { + break; + } + //we have all the rows here + Result [] rrs = callable.withRetries(); + if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) { + break; //exit completely + } + for (Result rr : rrs) { + if (processedRows >= rowUpperLimit) { + break done; + } + if (!visitor.processRow(rr)) + break done; //exit completely + processedRows++; + } + //here, we didn't break anywhere. Check if we have more rows + } while(true); + // Advance the startRow to the end key of the current region + startRow = callable.getHRegionInfo().getEndKey(); + } finally { + // Close scanner + callable.setClose(); + callable.withRetries(); + } + } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0); + } + + /** + * Returns HRegionInfo object from the column + * HConstants.CATALOG_FAMILY:HConstants.REGIONINFO_QUALIFIER of the catalog + * table Result. + * @param data a Result object from the catalog table scan + * @return HRegionInfo or null + */ + public static HRegionInfo getHRegionInfo(Result data) { + byte [] bytes = + data.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + if (bytes == null) return null; + HRegionInfo info = HRegionInfo.parseFromOrNull(bytes); + if (LOG.isDebugEnabled()) { + LOG.debug("Current INFO from scan results = " + info); + } + return info; + } + + /** + * Lists all of the regions currently in META. + * @param conf + * @return List of all user-space regions. + * @throws IOException + */ + public static List listAllRegions(Configuration conf) + throws IOException { + return listAllRegions(conf, true); + } + + /** + * Lists all of the regions currently in META. + * @param conf + * @param offlined True if we are to include offlined regions, false and we'll + * leave out offlined regions from returned list. + * @return List of all user-space regions. + * @throws IOException + */ + public static List listAllRegions(Configuration conf, final boolean offlined) + throws IOException { + final List regions = new ArrayList(); + MetaScannerVisitor visitor = new BlockingMetaScannerVisitor(conf) { + @Override + public boolean processRowInternal(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return true; + } + + HRegionInfo regionInfo = getHRegionInfo(result); + if (regionInfo == null) { + LOG.warn("Null REGIONINFO_QUALIFIER: " + result); + return true; + } + + // If region offline AND we are not to include offlined regions, return. + if (regionInfo.isOffline() && !offlined) return true; + regions.add(regionInfo); + return true; + } + }; + metaScan(conf, visitor); + return regions; + } + + /** + * Lists all of the table regions currently in META. + * @param conf + * @param offlined True if we are to include offlined regions, false and we'll + * leave out offlined regions from returned list. + * @return Map of all user-space regions to servers + * @throws IOException + */ + public static NavigableMap allTableRegions(Configuration conf, + final byte [] tablename, final boolean offlined) throws IOException { + final NavigableMap regions = + new TreeMap(); + MetaScannerVisitor visitor = new TableMetaScannerVisitor(conf, tablename) { + @Override + public boolean processRowInternal(Result rowResult) throws IOException { + HRegionInfo info = getHRegionInfo(rowResult); + ServerName serverName = HRegionInfo.getServerName(rowResult); + + if (!(info.isOffline() || info.isSplit())) { + regions.put(new UnmodifyableHRegionInfo(info), serverName); + } + return true; + } + }; + metaScan(conf, visitor, tablename); + return regions; + } + + /** + * Visitor class called to process each row of the .META. table + */ + public interface MetaScannerVisitor extends Closeable { + /** + * Visitor method that accepts a RowResult and the meta region location. + * Implementations can return false to stop the region's loop if it becomes + * unnecessary for some reason. + * + * @param rowResult result + * @return A boolean to know if it should continue to loop in the region + * @throws IOException e + */ + public boolean processRow(Result rowResult) throws IOException; + } + + public static abstract class MetaScannerVisitorBase implements MetaScannerVisitor { + @Override + public void close() throws IOException { + } + } + + /** + * A MetaScannerVisitor that provides a consistent view of the table's + * META entries during concurrent splits (see HBASE-5986 for details). This class + * does not guarantee ordered traversal of meta entries, and can block until the + * META entries for daughters are available during splits. + */ + public static abstract class BlockingMetaScannerVisitor + extends MetaScannerVisitorBase { + + private static final int DEFAULT_BLOCKING_TIMEOUT = 10000; + private Configuration conf; + private TreeSet daughterRegions = new TreeSet(Bytes.BYTES_COMPARATOR); + private int blockingTimeout; + private HTable metaTable; + + public BlockingMetaScannerVisitor(Configuration conf) { + this.conf = conf; + this.blockingTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + DEFAULT_BLOCKING_TIMEOUT); + } + + public abstract boolean processRowInternal(Result rowResult) throws IOException; + + @Override + public void close() throws IOException { + super.close(); + if (metaTable != null) { + metaTable.close(); + metaTable = null; + } + } + + public HTable getMetaTable() throws IOException { + if (metaTable == null) { + metaTable = new HTable(conf, HConstants.META_TABLE_NAME); + } + return metaTable; + } + + @Override + public boolean processRow(Result rowResult) throws IOException { + HRegionInfo info = getHRegionInfo(rowResult); + if (info == null) { + return true; + } + + if (daughterRegions.remove(info.getRegionName())) { + return true; //we have already processed this row + } + + if (info.isSplitParent()) { + /* we have found a parent region which was split. We have to ensure that it's daughters are + * seen by this scanner as well, so we block until they are added to the META table. Even + * though we are waiting for META entries, ACID semantics in HBase indicates that this + * scanner might not see the new rows. So we manually query the daughter rows */ + PairOfSameType daughters = HRegionInfo.getDaughterRegions(rowResult); + HRegionInfo splitA = daughters.getFirst(); + HRegionInfo splitB = daughters.getSecond(); + + HTable metaTable = getMetaTable(); + long start = System.currentTimeMillis(); + Result resultA = getRegionResultBlocking(metaTable, blockingTimeout, + splitA.getRegionName()); + if (resultA != null) { + processRow(resultA); + daughterRegions.add(splitA.getRegionName()); + } else { + throw new RegionOfflineException("Split daughter region " + + splitA.getRegionNameAsString() + " cannot be found in META."); + } + long rem = blockingTimeout - (System.currentTimeMillis() - start); + + Result resultB = getRegionResultBlocking(metaTable, rem, + splitB.getRegionName()); + if (resultB != null) { + processRow(resultB); + daughterRegions.add(splitB.getRegionName()); + } else { + throw new RegionOfflineException("Split daughter region " + + splitB.getRegionNameAsString() + " cannot be found in META."); + } + } + + return processRowInternal(rowResult); + } + + private Result getRegionResultBlocking(HTable metaTable, long timeout, byte[] regionName) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("blocking until region is in META: " + Bytes.toStringBinary(regionName)); + } + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timeout) { + Get get = new Get(regionName); + Result result = metaTable.get(get); + HRegionInfo info = getHRegionInfo(result); + if (info != null) { + return result; + } + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + break; + } + } + return null; + } + } + + /** + * A MetaScannerVisitor for a table. Provides a consistent view of the table's + * META entries during concurrent splits (see HBASE-5986 for details). This class + * does not guarantee ordered traversal of meta entries, and can block until the + * META entries for daughters are available during splits. + */ + public static abstract class TableMetaScannerVisitor extends BlockingMetaScannerVisitor { + private byte[] tableName; + + public TableMetaScannerVisitor(Configuration conf, byte[] tableName) { + super(conf); + this.tableName = tableName; + } + + @Override + public final boolean processRow(Result rowResult) throws IOException { + HRegionInfo info = getHRegionInfo(rowResult); + if (info == null) { + return true; + } + if (!(Bytes.equals(info.getTableName(), tableName))) { + return false; + } + return super.processRow(rowResult); + } + + } +} Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Container for Actions (i.e. Get, Delete, or Put), which are grouped by + * regionName. Intended to be used with HConnectionManager.processBatch() + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class MultiAction { + + // map of regions to lists of puts/gets/deletes for that region. + public Map>> actions = new TreeMap>>(Bytes.BYTES_COMPARATOR); + + public MultiAction() { + super(); + } + + /** + * Get the total number of Actions + * + * @return total number of Actions for all groups in this container. + */ + public int size() { + int size = 0; + for (List l : actions.values()) { + size += l.size(); + } + return size; + } + + /** + * Add an Action to this container based on it's regionName. If the regionName + * is wrong, the initial execution will fail, but will be automatically + * retried after looking up the correct region. + * + * @param regionName + * @param a + */ + public void add(byte[] regionName, Action a) { + List> rsActions = actions.get(regionName); + if (rsActions == null) { + rsActions = new ArrayList>(); + actions.put(regionName, rsActions); + } + rsActions.add(a); + } + + public Set getRegions() { + return actions.keySet(); + } + + /** + * @return All actions from all regions in this container + */ + public List> allActions() { + List> res = new ArrayList>(); + for (List> lst : actions.values()) { + res.addAll(lst); + } + return res; + } +} Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1449950&view=auto ============================================================================== --- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (added) +++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Mon Feb 25 22:50:17 2013 @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * A container for Result objects, grouped by regionName. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MultiResponse { + + // map of regionName to list of (Results paired to the original index for that + // Result) + private Map>> results = + new TreeMap>>(Bytes.BYTES_COMPARATOR); + + public MultiResponse() { + super(); + } + + /** + * @return Number of pairs in this container + */ + public int size() { + int size = 0; + for (Collection c : results.values()) { + size += c.size(); + } + return size; + } + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param r + * First item in the pair is the original index of the Action + * (request). Second item is the Result. Result will be empty for + * successful Put and Delete actions. + */ + public void add(byte[] regionName, Pair r) { + List> rs = results.get(regionName); + if (rs == null) { + rs = new ArrayList>(); + results.put(regionName, rs); + } + rs.add(r); + } + + public void add(byte []regionName, int originalIndex, Object resOrEx) { + add(regionName, new Pair(originalIndex, resOrEx)); + } + + public Map>> getResults() { + return results; + } +} \ No newline at end of file