Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5AAEE9F6F for ; Thu, 14 Jun 2012 06:54:44 +0000 (UTC) Received: (qmail 89418 invoked by uid 500); 14 Jun 2012 06:54:44 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 89243 invoked by uid 500); 14 Jun 2012 06:54:43 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 89219 invoked by uid 99); 14 Jun 2012 06:54:42 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jun 2012 06:54:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jun 2012 06:54:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id ACD9C2388865 for ; Thu, 14 Jun 2012 06:54:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1350105 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/rest/client/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apach... Date: Thu, 14 Jun 2012 06:54:19 -0000 To: commits@hbase.apache.org From: tedyu@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120614065419.ACD9C2388865@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tedyu Date: Thu Jun 14 06:54:18 2012 New Revision: 1350105 URL: http://svn.apache.org/viewvc?rev=1350105&view=rev Log: HBASE-5924 In the client code, don't wait for all the requests to be executed before resubmitting a request in error (N Keywal) Submitted by: N Keywal Reviewed by: Stack, Ted Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Thu Jun 14 06:54:18 2012 @@ -290,7 +290,9 @@ public interface HConnection extends Abo * processed successfully. * @throws IOException if there are problems talking to META. Per-item * exceptions are stored in the results array. + * @deprecated since 0.96 - Use {@link HTableInterface#batch} instead */ + @Deprecated public void processBatch(List actions, final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException; @@ -298,7 +300,9 @@ public interface HConnection extends Abo /** * Parameterized batch processing, allowing varying return types for different * {@link Row} implementations. + * @deprecated since 0.96 - Use {@link HTableInterface#batchCallback} instead */ + @Deprecated public void processBatchCallback(List list, byte[] tableName, ExecutorService pool, Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Thu Jun 14 06:54:18 2012 @@ -27,16 +27,8 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -67,8 +59,6 @@ import org.apache.hadoop.hbase.ServerNam import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.AdminProtocol; -import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.coprocessor.Batch; @@ -82,13 +72,8 @@ import org.apache.hadoop.hbase.protobuf. import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.SoftValueSortedMap; -import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; @@ -1709,23 +1694,25 @@ public class HConnectionManager { // way around. final HConnection connection = this; return new Callable() { - public MultiResponse call() throws IOException { - ServerCallable callable = - new ServerCallable(connection, tableName, null) { - public MultiResponse call() throws IOException { - return ProtobufUtil.multi(server, multi); - } - @Override - public void connect(boolean reload) throws IOException { - server = connection.getClient( - loc.getHostname(), loc.getPort()); - } - }; - return callable.withoutRetries(); - } - }; + public MultiResponse call() throws IOException { + ServerCallable callable = + new ServerCallable(connection, tableName, null) { + public MultiResponse call() throws IOException { + return ProtobufUtil.multi(server, multi); + } + + @Override + public void connect(boolean reload) throws IOException { + server = connection.getClient( + loc.getHostname(), loc.getPort()); + } + }; + return callable.withoutRetries(); + } + }; } + void updateCachedLocation(HRegionLocation hrl, String hostname, int port) { HRegionLocation newHrl = new HRegionLocation(hrl.getRegionInfo(), hostname, port); synchronized (this.cachedRegionLocations) { @@ -1741,30 +1728,20 @@ public class HConnectionManager { } } - private void updateCachedLocations( - UpdateHistory updateHistory, HRegionLocation hrl, Object t) { - updateCachedLocations(updateHistory, hrl, null, null, t); - } - - private void updateCachedLocations(UpdateHistory updateHistory, byte[] tableName, Row row, - Object t) { - updateCachedLocations(updateHistory, null, tableName, row, t); + private void updateCachedLocations(byte[] tableName, Row row, Object t) { + updateCachedLocations(null, tableName, row, t); } /** * Update the location with the new value (if the exception is a RegionMovedException) or delete * it from the cache. - * We need to keep an history of the modifications, because we can have first an update then a - * delete. The delete would remove the update. - * @param updateHistory - The set used for the history * @param hrl - can be null. If it's the case, tableName and row should not be null * @param tableName - can be null if hrl is not null. * @param row - can be null if hrl is not null. * @param exception - An object (to simplify user code) on which we will try to find a nested * or wrapped or both RegionMovedException */ - private void updateCachedLocations( - UpdateHistory updateHistory, final HRegionLocation hrl, final byte[] tableName, + private void updateCachedLocations(final HRegionLocation hrl, final byte[] tableName, Row row, final Object exception) { if ((row == null || tableName == null) && hrl == null){ @@ -1781,15 +1758,6 @@ public class HConnectionManager { return; } - final String regionName = myLoc.getRegionInfo().getRegionNameAsString(); - - if (updateHistory.contains(regionName)) { - // Already updated/deleted => nothing to do - return; - } - - updateHistory.add(regionName); - final RegionMovedException rme = RegionMovedException.find(exception); if (rme != null) { LOG.info("Region " + myLoc.getRegionInfo().getRegionNameAsString() + " moved from " + @@ -1814,11 +1782,332 @@ public class HConnectionManager { throw new IllegalArgumentException( "argument results must be the same size as argument list"); } - processBatchCallback(list, tableName, pool, results, null); } /** + * Send the queries in parallel on the different region servers. Retries on failures. + * If the method returns it means that there is no error, and the 'results' array will + * contain no exception. On error, an exception is thrown, and the 'results' array will + * contain results and exceptions. + * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead + */ + @Override + @Deprecated + public void processBatchCallback( + List list, + byte[] tableName, + ExecutorService pool, + Object[] results, + Batch.Callback callback) + throws IOException, InterruptedException { + + Process p = new Process(this, list, tableName, pool, results, callback); + p.processBatchCallback(); + } + + + /** + * Methods and attributes to manage a batch process are grouped into this single class. + * This allows, by creating a Process per batch process to ensure multithread safety. + * + * This code should be move to HTable once processBatchCallback is not supported anymore in + * the HConnection interface. + */ + private static class Process { + // Info on the queries and their context + private final HConnectionImplementation hci; + private final List rows; + private final byte[] tableName; + private final ExecutorService pool; + private final Object[] results; + private final Batch.Callback callback; + + // Error management: these lists are filled by the errors on the final try. Indexes + // are consistent, i.e. exceptions[i] matches failedActions[i] and failedAddresses[i] + private final List exceptions; + private final List failedActions; + private final List failedAddresses; + + // Used during the batch process + private final List> toReplay; + private final LinkedList, HRegionLocation, Future>> + inProgress; + private int curNumRetries; + + // Notified when a tasks is done + private final List> finishedTasks = new ArrayList>(); + + private Process(HConnectionImplementation hci, List list, + byte[] tableName, ExecutorService pool, Object[] results, + Batch.Callback callback){ + this.hci = hci; + this.rows = list; + this.tableName = tableName; + this.pool = pool; + this.results = results; + this.callback = callback; + this.toReplay = new ArrayList>(); + this.inProgress = + new LinkedList, HRegionLocation, Future>>(); + this.exceptions = new ArrayList(); + this.failedActions = new ArrayList(); + this.failedAddresses = new ArrayList(); + this.curNumRetries = 0; + } + + + /** + * Group a list of actions per region servers, and send them. The created MultiActions are + * added to the inProgress list. + * @param actionsList + * @param sleepTime - sleep time before actually executing the actions. Can be zero. + * @throws IOException - if we can't locate a region after multiple retries. + */ + private void submit(List> actionsList, final long sleepTime) throws IOException { + // group per location => regions server + final Map> actionsByServer = + new HashMap>(); + for (Action aAction : actionsList) { + final Row row = aAction.getAction(); + + if (row != null) { + final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow(), true); + if (loc == null) { + throw new IOException("No location found, aborting submit."); + } + + final byte[] regionName = loc.getRegionInfo().getRegionName(); + MultiAction actions = actionsByServer.get(loc); + if (actions == null) { + actions = new MultiAction(); + actionsByServer.put(loc, actions); + } + actions.add(regionName, aAction); + } + } + + // Send the queries and add them to the inProgress list + for (Entry> e : actionsByServer.entrySet()) { + Callable callable = + createDelayedCallable(sleepTime, e.getKey(), e.getValue()); + Triple, HRegionLocation, Future> p = + new Triple, HRegionLocation, Future>( + e.getValue(), e.getKey(), this.pool.submit(callable)); + this.inProgress.addLast(p); + } + } + + + private void addToErrorsLists(Exception ex, Row row, Triple, + HRegionLocation, Future> obj) { + this.exceptions.add(ex); + this.failedActions.add(row); + this.failedAddresses.add(obj.getSecond().getHostnamePort()); + } + + /** + * Resubmit the actions which have failed, after a sleep time. + * @throws IOException + */ + private void doRetry() throws IOException{ + final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries); + submit(this.toReplay, sleepTime); + this.toReplay.clear(); + } + + /** + * Parameterized batch processing, allowing varying return types for + * different {@link Row} implementations. + * Throws an exception on error. If there are no exceptions, it means that the 'results' + * array is clean. + */ + private void processBatchCallback() throws IOException, InterruptedException { + if (this.results.length != this.rows.size()) { + throw new IllegalArgumentException( + "argument results (size="+results.length+") must be the same size as " + + "argument list (size="+this.rows.size()+")"); + } + if (this.rows.isEmpty()) { + return; + } + + // We keep the number of retry per action. + int[] nbRetries = new int[this.results.length]; + + // Build the action list. This list won't change after being created, hence the + // indexes will remain constant, allowing a direct lookup. + final List> listActions = new ArrayList>(this.rows.size()); + for (int i = 0; i < this.rows.size(); i++) { + Action action = new Action(this.rows.get(i), i); + listActions.add(action); + } + + // execute the actions. We will analyze and resubmit the actions in a 'while' loop. + submit(listActions, 0); + + // LastRetry is true if, either: + // we had an exception 'DoNotRetry' + // we had more than numRetries for any action + // In this case, we will finish the current retries but we won't start new ones. + boolean lastRetry = false; + // despite its name numRetries means number of tries. So if numRetries == 1 it means we + // won't retry. And we compare vs. 2 in case someone set it to zero. + boolean noRetry = (hci.numRetries < 2); + + // Analyze and resubmit until all actions are done successfully or failed after numRetries + while (!this.inProgress.isEmpty()) { + + // We need the original multi action to find out what actions to replay if + // we have a 'total' failure of the Future + // We need the HRegionLocation as we give it back if we go out of retries + Triple, HRegionLocation, Future> currentTask = + removeFirstDone(); + + // Get the answer, keep the exception if any as we will use it for the analysis + MultiResponse responses = null; + ExecutionException exception = null; + try { + responses = currentTask.getThird().get(); + } catch (ExecutionException e) { + exception = e; + } + + // Error case: no result at all for this multi action. We need to redo all actions + if (responses == null) { + for (List> actions : currentTask.getFirst().actions.values()) { + for (Action action : actions) { + Row row = action.getAction(); + hci.updateCachedLocations(this.tableName, row, exception); + if (noRetry) { + addToErrorsLists(exception, row, currentTask); + } else { + lastRetry = addToReplay(nbRetries, action); + } + } + } + } else { // Success or partial success + // Analyze detailed results. We can still have individual failures to be redo. + // two specific exceptions are managed: + // - DoNotRetryIOException: we continue to retry for other actions + // - RegionMovedException: we update the cache with the new region location + for (Entry>> resultsForRS : + responses.getResults().entrySet()) { + for (Pair regionResult : resultsForRS.getValue()) { + Action correspondingAction = listActions.get(regionResult.getFirst()); + Object result = regionResult.getSecond(); + this.results[correspondingAction.getOriginalIndex()] = result; + + // Failure: retry if it's make sense else update the errors lists + if (result == null || result instanceof Throwable) { + Row row = correspondingAction.getAction(); + hci.updateCachedLocations(this.tableName, row, result); + if (result instanceof DoNotRetryIOException || noRetry) { + addToErrorsLists((Exception)result, row, currentTask); + } else { + lastRetry = addToReplay(nbRetries, correspondingAction); + } + } else // success + if (callback != null) { + this.callback.update(resultsForRS.getKey(), + this.rows.get(regionResult.getFirst()).getRow(), + (R) result); + } + } + } + } + + // Retry all actions in toReplay then clear it. + if (!noRetry && !toReplay.isEmpty()) { + doRetry(); + if (lastRetry) { + noRetry = true; + } + } + } + + if (!exceptions.isEmpty()) { + throw new RetriesExhaustedWithDetailsException(this.exceptions, + this.failedActions, + this.failedAddresses); + } + } + + /** + * Put the action that has to be retried in the Replay list. + * @return true if we're out of numRetries and it's the last retry. + */ + private boolean addToReplay(int[] nbRetries, Action action) { + this.toReplay.add(action); + nbRetries[action.getOriginalIndex()]++; + if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) { + this.curNumRetries = nbRetries[action.getOriginalIndex()]; + } + // numRetries means number of tries, while curNumRetries means current number of retries. So + // we need to add 1 to make them comparable. And as we look for the last try we compare + // with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want + // to initialize it to 1. + return ( (this.curNumRetries +1) >= hci.numRetries); + } + + /** + * Wait for one of tasks to be done, and remove it from the list. + * @return the tasks done. + */ + private Triple, HRegionLocation, Future> + removeFirstDone() throws InterruptedException { + while (true) { + synchronized (finishedTasks) { + if (!finishedTasks.isEmpty()) { + MultiAction done = finishedTasks.remove(finishedTasks.size() - 1); + + // We now need to remove it from the inProgress part. + Iterator, HRegionLocation, Future>> it = + inProgress.iterator(); + while (it.hasNext()) { + Triple, HRegionLocation, Future> task = it.next(); + if (task.getFirst() == done) { // We have the exact object. No java equals here. + it.remove(); + return task; + } + } + LOG.error("Development error: We didn't see a task in the list. " + + done.getRegions()); + } + finishedTasks.wait(10); + } + } + } + + private Callable createDelayedCallable( + final long delay, final HRegionLocation loc, final MultiAction multi) { + + final Callable delegate = hci.createCallable(loc, multi, tableName); + + return new Callable() { + private final long creationTime = System.currentTimeMillis(); + + @Override + public MultiResponse call() throws Exception { + try { + final long waitingTime = delay + creationTime - System.currentTimeMillis(); + if (waitingTime > 0) { + Thread.sleep(waitingTime); + } + return delegate.call(); + } finally { + synchronized (finishedTasks) { + finishedTasks.add(multi); + finishedTasks.notifyAll(); + } + } + } + }; + } + } + + + /** * Executes the given * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call} * callable for each row in the @@ -1884,182 +2173,6 @@ public class HConnectionManager { } } - private static class UpdateHistory{ - private final Set updateHistory = new HashSet(100); // size: if we're doing a - // rolling restart we may have 100 regions with a wrong location if we're really unlucky - - public boolean contains(String regionName) { - return updateHistory.contains(regionName); - } - - public void add(String regionName) { - updateHistory.add(regionName); - } - } - - /** - * Parameterized batch processing, allowing varying return types for - * different {@link Row} implementations. - */ - @Override - public void processBatchCallback( - List list, - byte[] tableName, - ExecutorService pool, - Object[] results, - Batch.Callback callback) - throws IOException, InterruptedException { - // This belongs in HTable!!! Not in here. St.Ack - - // results must be the same size as list - if (results.length != list.size()) { - throw new IllegalArgumentException( - "argument results must be the same size as argument list"); - } - if (list.isEmpty()) { - return; - } - - // Keep track of the most recent servers for any given item for better - // exceptional reporting. We keep HRegionLocation to save on parsing. - // Later below when we use lastServers, we'll pull what we need from - // lastServers. - HRegionLocation [] lastServers = new HRegionLocation[results.length]; - List workingList = new ArrayList(list); - boolean retry = true; - // count that helps presize actions array - int actionCount = 0; - - for (int tries = 0; tries < numRetries && retry; ++tries) { - UpdateHistory updateHistory = new UpdateHistory(); - - // sleep first, if this is a retry - if (tries >= 1) { - long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries); - LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!"); - Thread.sleep(sleepTime); - } - // step 1: break up into regionserver-sized chunks and build the data structs - Map> actionsByServer = - new HashMap>(); - for (int i = 0; i < workingList.size(); i++) { - Row row = workingList.get(i); - if (row != null) { - HRegionLocation loc = locateRegion(tableName, row.getRow(), true); - byte[] regionName = loc.getRegionInfo().getRegionName(); - - MultiAction actions = actionsByServer.get(loc); - if (actions == null) { - actions = new MultiAction(); - actionsByServer.put(loc, actions); - } - - Action action = new Action(row, i); - lastServers[i] = loc; - actions.add(regionName, action); - } - } - - // step 2: make the requests - - Map> futures = - new HashMap>( - actionsByServer.size()); - - for (Entry> e: actionsByServer.entrySet()) { - futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); - } - - // step 3: collect the failures and successes and prepare for retry - - for (Entry> responsePerServer - : futures.entrySet()) { - HRegionLocation loc = responsePerServer.getKey(); - - try { - Future future = responsePerServer.getValue(); - MultiResponse resp = future.get(); - - if (resp == null) { - // Entire server failed - LOG.debug("Failed all for server: " + loc.getHostnamePort() + - ", removing from cache"); - continue; - } - - for (Entry>> e : resp.getResults().entrySet()) { - byte[] regionName = e.getKey(); - List> regionResults = e.getValue(); - for (Pair regionResult : regionResults) { - if (regionResult == null) { - // if the first/only record is 'null' the entire region failed. - LOG.debug("Failures for region: " + - Bytes.toStringBinary(regionName) + - ", removing from cache"); - } else { - // Result might be an Exception, including DNRIOE - results[regionResult.getFirst()] = regionResult.getSecond(); - if (callback != null && !(regionResult.getSecond() instanceof Throwable)) { - callback.update(e.getKey(), - list.get(regionResult.getFirst()).getRow(), - (R)regionResult.getSecond()); - } - } - } - } - } catch (ExecutionException e) { - LOG.debug("Failed all from " + loc, e); - updateCachedLocations(updateHistory, loc, e); - } - } - - // step 4: identify failures and prep for a retry (if applicable). - - // Find failures (i.e. null Result), and add them to the workingList (in - // order), so they can be retried. - retry = false; - workingList.clear(); - actionCount = 0; - for (int i = 0; i < results.length; i++) { - // if null (fail) or instanceof Throwable && not instanceof DNRIOE - // then retry that row. else dont. - if (results[i] == null || - (results[i] instanceof Throwable && - !(results[i] instanceof DoNotRetryIOException))) { - - retry = true; - actionCount++; - Row row = list.get(i); - workingList.add(row); - updateCachedLocations(updateHistory, tableName, row, results[i]); - } else { - if (results[i] != null && results[i] instanceof Throwable) { - actionCount++; - } - // add null to workingList, so the order remains consistent with the original list argument. - workingList.add(null); - } - } - } - - List exceptions = new ArrayList(actionCount); - List actions = new ArrayList(actionCount); - List addresses = new ArrayList(actionCount); - - for (int i = 0 ; i < results.length; i++) { - if (results[i] == null || results[i] instanceof Throwable) { - exceptions.add((Throwable)results[i]); - actions.add(list.get(i)); - addresses.add(lastServers[i].getHostnamePort()); - } - } - - if (!exceptions.isEmpty()) { - throw new RetriesExhaustedWithDetailsException(exceptions, - actions, - addresses); - } - } /* * Return the number of cached region for a table. It will only be called @@ -2078,6 +2191,8 @@ public class HConnectionManager { } } + + /** * Check the region cache to see whether a region is cached yet or not. * Called by unit tests. Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Jun 14 06:54:18 2012 @@ -23,6 +23,7 @@ import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Arrays; @@ -707,22 +708,33 @@ public class HTable implements HTableInt } } - /** - * {@inheritDoc} - */ @Override - public synchronized void batch(final List actions, final Object[] results) + public void batch(final List actions, final Object[] results) throws InterruptedException, IOException { - connection.processBatch(actions, tableName, pool, results); + connection.processBatchCallback(actions, tableName, pool, results, null); + } + + @Override + public Object[] batch(final List actions) + throws InterruptedException, IOException { + Object[] results = new Object[actions.size()]; + connection.processBatchCallback(actions, tableName, pool, results, null); + return results; + } + + @Override + public void batchCallback( + final List actions, final Object[] results, final Batch.Callback callback) + throws IOException, InterruptedException { + connection.processBatchCallback(actions, tableName, pool, results, callback); } - /** - * {@inheritDoc} - */ @Override - public synchronized Object[] batch(final List actions) throws InterruptedException, IOException { + public Object[] batchCallback( + final List actions, final Batch.Callback callback) throws IOException, + InterruptedException { Object[] results = new Object[actions.size()]; - connection.processBatch(actions, tableName, pool, results); + connection.processBatchCallback(actions, tableName, pool, results, callback); return results; } @@ -985,41 +997,63 @@ public class HTable implements HTableInt */ @Override public void flushCommits() throws IOException { + Object[] results = new Object[writeBuffer.size()]; + boolean success = false; try { - Object[] results = new Object[writeBuffer.size()]; - try { - this.connection.processBatch(writeBuffer, tableName, pool, results); - } catch (InterruptedException e) { - throw new IOException(e); - } finally { - // mutate list so that it is empty for complete success, or contains - // only failed records results are returned in the same order as the - // requests in list walk the list backwards, so we can remove from list - // without impacting the indexes of earlier members - for (int i = results.length - 1; i>=0; i--) { - if (results[i] instanceof Result) { - // successful Puts are removed from the list here. - writeBuffer.remove(i); - } - } - } + this.connection.processBatch(writeBuffer, tableName, pool, results); + success = true; + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); } finally { - if (clearBufferOnFail) { + // mutate list so that it is empty for complete success, or contains + // only failed records. Results are returned in the same order as the + // requests in list. Walk the list backwards, so we can remove from list + // without impacting the indexes of earlier members + currentWriteBufferSize = 0; + if (success || clearBufferOnFail) { writeBuffer.clear(); - currentWriteBufferSize = 0; } else { - // the write buffer was adjusted by processBatchOfPuts - currentWriteBufferSize = 0; - for (Put aPut : writeBuffer) { - currentWriteBufferSize += aPut.heapSize(); + for (int i = results.length - 1; i >= 0; i--) { + if (results[i] instanceof Result) { + writeBuffer.remove(i); + } else { + currentWriteBufferSize += writeBuffer.get(i).heapSize(); + } } } } } /** - * {@inheritDoc} + * Process a mixed batch of Get, Put and Delete actions. All actions for a + * RegionServer are forwarded in one RPC call. Queries are executed in parallel. + * + * + * @param actions The collection of actions. + * @param results An empty array, same size as list. If an exception is thrown, + * you can test here for partial results, and to determine which actions + * processed successfully. + * @throws IOException if there are problems talking to META. Per-item + * exceptions are stored in the results array. */ + public void processBatchCallback( + final List list, final Object[] results, final Batch.Callback callback) + throws IOException, InterruptedException { + connection.processBatchCallback(list, tableName, pool, results, callback); + } + + + /** + * Parameterized batch processing, allowing varying return types for different + * {@link Row} implementations. + */ + public void processBatch(final List list, final Object[] results) + throws IOException, InterruptedException { + + this.processBatchCallback(list, results, null); + } + + @Override public void close() throws IOException { if (this.closed) { Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Thu Jun 14 06:54:18 2012 @@ -107,6 +107,23 @@ public interface HTableInterface extends Object[] batch(final List actions) throws IOException, InterruptedException; /** + * Same as {@link #batch(List, Object[])}, but with a callback. + * @since 0.96.0 + */ + public void batchCallback( + final List actions, final Object[] results, final Batch.Callback callback) + throws IOException, InterruptedException; + + + /** + * Same as {@link #batch(List)}, but with a callback. + * @since 0.96.0 + */ + public Object[] batchCallback( + List actions, Batch.Callback callback) throws IOException, + InterruptedException; + + /** * Extracts certain cells from a given row. * @param get The object that specifies what data to fetch and from which row. * @return The data coming from the specified row, if it exists. If the row Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Thu Jun 14 06:54:18 2012 @@ -501,6 +501,18 @@ public abstract class CoprocessorHost void batchCallback(List actions, Object[] results, + Batch.Callback callback) throws IOException, InterruptedException { + table.batchCallback(actions, results, callback); + } + + @Override + public Object[] batchCallback(List actions, + Batch.Callback callback) throws IOException, InterruptedException { + return table.batchCallback(actions, callback); + } + + @Override public Result[] get(List gets) throws IOException { return table.get(gets); } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Thu Jun 14 06:54:18 2012 @@ -694,6 +694,18 @@ public class RemoteHTable implements HTa } @Override + public void batchCallback(List actions, Object[] results, + Batch.Callback callback) throws IOException, InterruptedException { + throw new IOException("batchCallback not supported"); + } + + @Override + public Object[] batchCallback(List actions, Batch.Callback callback) + throws IOException, InterruptedException { + throw new IOException("batchCallback not supported"); + } + + @Override public Result[] get(List gets) throws IOException { throw new IOException("get(List) not supported"); } Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java?rev=1350105&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Triple.java Thu Jun 14 06:54:18 2012 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +/** + * Utility class to manage a triple. + */ +public class Triple { + private A first; + private B second; + private C third; + + public Triple(A first, B second, C third) { + this.first = first; + this.second = second; + this.third = third; + } + + public int hashCode() { + int hashFirst = (first != null ? first.hashCode() : 0); + int hashSecond = (second != null ? second.hashCode() : 0); + int hashThird = (third != null ? third.hashCode() : 0); + + return (hashFirst >> 1) ^ hashSecond ^ (hashThird << 1); + } + + public boolean equals(Object obj) { + if (!(obj instanceof Triple)) { + return false; + } + + Triple otherTriple = (Triple) obj; + + if (first != otherTriple.first && (first != null && !(first.equals(otherTriple.first)))) + return false; + if (second != otherTriple.second && (second != null && !(second.equals(otherTriple.second)))) + return false; + if (third != otherTriple.third && (third != null && !(third.equals(otherTriple.third)))) + return false; + + return true; + } + + public String toString() { + return "(" + first + ", " + second + "," + third + " )"; + } + + public A getFirst() { + return first; + } + + public void setFirst(A first) { + this.first = first; + } + + public B getSecond() { + return second; + } + + public void setSecond(B second) { + this.second = second; + } + + public C getThird() { + return third; + } + + public void setThird(C third) { + this.third = third; + } +} + + + Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Thu Jun 14 06:54:18 2012 @@ -27,6 +27,7 @@ import static org.junit.Assert.assertFal import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.SynchronousQueue; @@ -39,7 +40,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.rest.protobuf.generated.ScannerMessage; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Assert; @@ -222,7 +222,12 @@ public class TestHCM { table.put(put3); Assert.assertFalse("Unreachable point", true); }catch (Throwable e){ - LOG.info("Put done, expected exception caught: "+e.getClass()); + LOG.info("Put done, exception caught: "+e.getClass()); + // Now check that we have the exception we wanted + Assert.assertTrue(e instanceof RetriesExhaustedWithDetailsException); + RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException)e; + Assert.assertTrue(re.getNumExceptions() == 1); + Assert.assertTrue(Arrays.equals(re.getRow(0).getRow(), ROW)); } Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW)); Assert.assertEquals( @@ -452,6 +457,7 @@ public class TestHCM { conn.close(); } + @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterCoprocessorExceptionWithRemove.java Thu Jun 14 06:54:18 2012 @@ -204,6 +204,8 @@ public class TestMasterCoprocessorExcept String loadedCoprocessors = master.getLoadedCoprocessors(); assertTrue(loadedCoprocessors.equals("[" + coprocessorName + "]")); + + // Verify that BuggyMasterObserver has been removed due to its misbehavior // by creating another table: should not have a problem this time. HTableDescriptor htd2 = new HTableDescriptor(TEST_TABLE2); Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java?rev=1350105&r1=1350104&r2=1350105&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java Thu Jun 14 06:54:18 2012 @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; +import java.io.InterruptedIOException; +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,47 +51,16 @@ import static org.junit.Assert.*; */ @Category(MediumTests.class) public class TestRegionServerCoprocessorExceptionWithAbort { - static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class); - - private class zkwAbortable implements Abortable { - @Override - public void abort(String why, Throwable e) { - throw new RuntimeException("Fatal ZK rs tracker error, why=", e); - } - @Override - public boolean isAborted() { - return false; - } - }; - - private class RSTracker extends ZooKeeperNodeTracker { - public boolean regionZKNodeWasDeleted = false; - public String rsNode; - private Thread mainThread; - - public RSTracker(ZooKeeperWatcher zkw, String rsNode, Thread mainThread) { - super(zkw, rsNode, new zkwAbortable()); - this.rsNode = rsNode; - this.mainThread = mainThread; - } - - @Override - public synchronized void nodeDeleted(String path) { - if (path.equals(rsNode)) { - regionZKNodeWasDeleted = true; - mainThread.interrupt(); - } - } - } - private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - static final int timeout = 30000; + static final Log LOG = LogFactory.getLog(TestRegionServerCoprocessorExceptionWithAbort.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final String TABLE_NAME = "observed_table"; @BeforeClass public static void setupBeforeClass() throws Exception { // set configure to indicate which cp should be loaded Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - BuggyRegionObserver.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast. + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, BuggyRegionObserver.class.getName()); conf.set("hbase.coprocessor.abortonerror", "true"); TEST_UTIL.startMiniCluster(2); } @@ -101,59 +72,31 @@ public class TestRegionServerCoprocessor @Test public void testExceptionFromCoprocessorDuringPut() - throws IOException { + throws IOException { // When we try to write to TEST_TABLE, the buggy coprocessor will // cause a NullPointerException, which will cause the regionserver (which // hosts the region we attempted to write to) to abort. - byte[] TEST_TABLE = Bytes.toBytes("observed_table"); + byte[] TEST_TABLE = Bytes.toBytes(TABLE_NAME); byte[] TEST_FAMILY = Bytes.toBytes("aaa"); HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY); - TEST_UTIL.waitUntilAllRegionsAssigned( - TEST_UTIL.createMultiRegions(table, TEST_FAMILY)); + TEST_UTIL.waitUntilAllRegionsAssigned(TEST_UTIL.createMultiRegions(table, TEST_FAMILY)); // Note which regionServer will abort (after put is attempted). - final HRegionServer regionServer = - TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE); + final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE); - // add watch so we can know when this regionserver aborted. - ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), - "unittest", new zkwAbortable()); - - RSTracker rsTracker = new RSTracker(zkw, - "/hbase/rs/"+regionServer.getServerName(), Thread.currentThread()); - rsTracker.start(); - zkw.registerListener(rsTracker); + final byte[] ROW = Bytes.toBytes("aaa"); + Put put = new Put(ROW); + put.add(TEST_FAMILY, ROW, ROW); - boolean caughtInterruption = false; + Assert.assertFalse("The region server should be available", regionServer.isAborted()); try { - final byte[] ROW = Bytes.toBytes("aaa"); - Put put = new Put(ROW); - put.add(TEST_FAMILY, ROW, ROW); table.put(put); - } catch (IOException e) { - // Depending on exact timing of the threads involved, zkw's interruption - // might be caught here ... - if (e.getCause().getClass().equals(InterruptedException.class)) { - LOG.debug("caught interruption here (during put())."); - caughtInterruption = true; - } else { - fail("put() failed: " + e); - } - } - if (caughtInterruption == false) { - try { - Thread.sleep(timeout); - fail("RegionServer did not abort within 30 seconds."); - } catch (InterruptedException e) { - // .. or it might be caught here. - LOG.debug("caught interruption here (during sleep())."); - caughtInterruption = true; - } + fail("The put should have failed, as the coprocessor is buggy"); + } catch (IOException ignored) { + // Expected. } - assertTrue("Main thread caught interruption.",caughtInterruption); - assertTrue("RegionServer aborted on coprocessor exception, as expected.", - rsTracker.regionZKNodeWasDeleted); + Assert.assertTrue("The region server should have aborted", regionServer.isAborted()); table.close(); } @@ -162,11 +105,9 @@ public class TestRegionServerCoprocessor public void prePut(final ObserverContext c, final Put put, final WALEdit edit, final boolean writeToWAL) { - String tableName = - c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString(); - if (tableName.equals("observed_table")) { - Integer i = null; - i = i + 1; + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString(); + if (TABLE_NAME.equals(tableName)) { + throw new NullPointerException("Buggy coprocessor"); } } } @@ -175,4 +116,3 @@ public class TestRegionServerCoprocessor public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); } -