Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 89149119AE for ; Fri, 19 Sep 2014 18:08:24 +0000 (UTC) Received: (qmail 28802 invoked by uid 500); 19 Sep 2014 18:08:24 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 28756 invoked by uid 500); 19 Sep 2014 18:08:24 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 28747 invoked by uid 99); 19 Sep 2014 18:08:24 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Sep 2014 18:08:24 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 254799CC905; Fri, 19 Sep 2014 18:08:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: larsh@apache.org To: commits@hbase.apache.org Message-Id: <993906bdcfbf4097b37f55f183618f65@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: HBASE-11957 Backport to 0.94 HBASE-5974 Scanner retry behavior with RPC timeout on next() seems incorrect. (Liu Shaohui original patch by Anoop Sam John) Date: Fri, 19 Sep 2014 18:08:24 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/0.94 0058e7aac -> 8f9faabf5 HBASE-11957 Backport to 0.94 HBASE-5974 Scanner retry behavior with RPC timeout on next() seems incorrect. (Liu Shaohui original patch by Anoop Sam John) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f9faabf Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f9faabf Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f9faabf Branch: refs/heads/0.94 Commit: 8f9faabf579c02476acb791c145f34baf49ac8f5 Parents: 0058e7a Author: Lars Hofhansl Authored: Fri Sep 19 11:05:02 2014 -0700 Committer: Lars Hofhansl Committed: Fri Sep 19 11:06:19 2014 -0700 ---------------------------------------------------------------------- .../hbase/CallSequenceOutOfOrderException.java | 35 ++++++ .../hadoop/hbase/client/ClientScanner.java | 6 +- .../hadoop/hbase/client/ScannerCallable.java | 32 +++++- .../hadoop/hbase/ipc/HRegionInterface.java | 12 ++ .../hbase/regionserver/HRegionServer.java | 53 ++++++--- .../hbase/regionserver/RegionScannerHolder.java | 44 +++++++ .../hadoop/hbase/util/JVMClusterUtil.java | 5 +- .../client/TestClientScannerRPCTimeout.java | 115 +++++++++++++++++++ 8 files changed, 281 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java b/src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java new file mode 100644 index 0000000..d3a77be --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/CallSequenceOutOfOrderException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +/** + * Thrown by a region server while doing scan related next() calls. Both client and server maintain a + * callSequence and if they do not match, RS will throw this exception. + */ +public class CallSequenceOutOfOrderException extends DoNotRetryIOException { + + private static final long serialVersionUID = 1565946556907760065L; + + public CallSequenceOutOfOrderException() { + super(); + } + + public CallSequenceOutOfOrderException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 7e72d57..1301ffb 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -296,8 +297,9 @@ public class ClientScanner extends AbstractClientScanner { } } else { Throwable cause = e.getCause(); - if (cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) { + if ((cause == null || (!(cause instanceof NotServingRegionException) + && !(cause instanceof RegionServerStoppedException))) + && !(e instanceof CallSequenceOutOfOrderException)) { throw e; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 0c4677f..8662db0 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -21,10 +21,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.net.UnknownHostException; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -63,6 +65,8 @@ public class ScannerCallable extends ServerCallable { // indicate if it is a remote server call private boolean isRegionServerRemote = true; + private long callSeq = 0; + private boolean useCallSeq = true; /** * @param connection which connection @@ -129,7 +133,33 @@ public class ScannerCallable extends ServerCallable { try { incRPCcallsMetrics(); long timestamp = System.currentTimeMillis(); - rrs = server.next(scannerId, caching); + if (useCallSeq) { + try { + rrs = server.next(scannerId, caching, callSeq); + // increment the callSeq which will be getting used for the next time next() call to + // the RS.In case of a timeout this increment should not happen so that the next + // trial also will be done with the same callSeq. + callSeq++; + } catch (IOException ioe) { + // TODO This is an ugly way of checking. Any other ways? + if (ioe instanceof RemoteException + && ExceptionUtils.getStackTrace(ioe).contains("java.lang.NoSuchMethodException")) { + // This will happen when we use a latest version of the client but still running with + // old region server. At server side there is no implementation for the seq number + // based scanning. Set the useCallSeq to false. + LOG.warn("Seq number based scan API not present at RS side! Trying with API: " + + "next(scannerId, caching). Consider upgrading version at RS " + + location.getHostnamePort()); + useCallSeq = false; + rrs = server.next(scannerId, caching); + } else { + // Throw it back so that will get handled by the below original catch blocks; + throw ioe; + } + } + } else { + rrs = server.next(scannerId, caching); + } if (logScannerActivity) { long now = System.currentTimeMillis(); if (now - timestamp > logCutOffLatency) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 014ee9f..8eae0ec 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -328,6 +328,18 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl public Result [] next(long scannerId, int numberOfRows) throws IOException; /** + * Get the next set of values + * @param scannerId clientId passed to openScanner + * @param numberOfRows the number of rows to fetch + * @param callSeq the number which represents the sequence used by client scanner + * @return Array of Results (map of values); array is empty if done with this + * region and null if we are NOT to go to the next region (happens when a + * filter rules that the scan is done). + * @throws IOException e + */ + public Result[] next(long scannerId, int caching, long callSeq) throws IOException; + + /** * Close a scanner * * @param scannerId the scanner id returned by openScanner http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ed278c7..eadc9e8 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -59,6 +59,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CallSequenceOutOfOrderException; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -312,8 +313,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; - final Map scanners = - new ConcurrentHashMap(); + final Map scanners = + new ConcurrentHashMap(); // zookeeper connection and watcher private ZooKeeperWatcher zooKeeper; @@ -569,8 +570,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return HConstants.NORMAL_QOS; } String scannerIdString = Long.toString(scannerId); - RegionScanner scanner = scanners.get(scannerIdString); - if (scanner != null && scanner.getRegionInfo().isMetaTable()) { + RegionScannerHolder holder = scanners.get(scannerIdString); + if (holder != null && holder.getScanner().getRegionInfo().isMetaRegion()) { // LOG.debug("High priority scanner request: " + scannerId); return HConstants.HIGH_QOS; } @@ -1063,9 +1064,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, private void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. - for (Map.Entry e : this.scanners.entrySet()) { + for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().close(); + e.getValue().getScanner().close(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -2606,7 +2607,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, long scannerId = -1L; scannerId = rand.nextLong(); String scannerName = String.valueOf(scannerId); - scanners.put(scannerName, s); + scanners.put(scannerName, new RegionScannerHolder(s)); this.leases.createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } @@ -2619,14 +2620,28 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return res[0]; } - public Result[] next(final long scannerId, int nbRows) throws IOException { + public Result[] next(final long scannerId, int nbRows) throws IOException { + return next(scannerId, nbRows, -1); + } + + public Result[] next(final long scannerId, int nbRows, long callSeq) throws IOException { String scannerName = String.valueOf(scannerId); - RegionScanner s = this.scanners.get(scannerName); - if (s == null) { + RegionScannerHolder holder = this.scanners.get(scannerName); + if (holder == null) { LOG.info("Client tried to access missing scanner " + scannerName); throw new UnknownScannerException("Name: " + scannerName); } - return internalNext(s, nbRows, scannerName); + // if callSeq does not match throw Exception straight away. This needs to be performed even + // before checking of Lease. + // Old next() APIs which do not take callSeq will pass it as -1 and for that no + // need to match the callSeq from client and the one in server. + if (callSeq != -1 && callSeq != holder.getCallSeq()) { + throw new CallSequenceOutOfOrderException("Expected seq: " + holder.getCallSeq() + + " But the seq got from client: " + callSeq); + } + // Increment the callSeq value which is the next expected from client. + holder.incrCallSeq(); + return internalNext(holder.getScanner(), nbRows, scannerName); } private Result[] internalNext(final RegionScanner s, int nbRows, @@ -2739,8 +2754,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public void close(final long scannerId) throws IOException { String scannerName = String.valueOf(scannerId); - RegionScanner s = scanners.get(scannerName); - internalCloseScanner(s, scannerName); + RegionScannerHolder holder = this.scanners.get(scannerName); + if (holder == null) throw new UnknownScannerException("Name: " + scannerName); + internalCloseScanner(holder.getScanner(), scannerName); } private void internalCloseScanner(final RegionScanner s, String scannerName) @@ -2748,7 +2764,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, try { checkOpen(); requestCount.incrementAndGet(); - HRegion region = null; if (s != null) { // call coprocessor. @@ -2761,7 +2776,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } RegionScanner toCloseScanner = s; if (scannerName != null) { - toCloseScanner = scanners.remove(scannerName); + RegionScannerHolder holder = scanners.remove(scannerName); + if (holder!= null) { + toCloseScanner = holder.getScanner(); + } } if (toCloseScanner != null) { toCloseScanner.close(); @@ -2802,8 +2820,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } public void leaseExpired() { - RegionScanner s = scanners.remove(this.scannerName); - if (s != null) { + RegionScannerHolder holder = scanners.remove(this.scannerName); + if (holder != null) { + RegionScanner s = holder.getScanner(); LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java new file mode 100644 index 0000000..50c690d --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java @@ -0,0 +1,44 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +/** + * Holder class which holds the RegionScanner and callSequence together. + */ +public class RegionScannerHolder { + private RegionScanner s; + private long callSeq = 0L; + + public RegionScannerHolder(RegionScanner s) { + this.s = s; + } + + public RegionScanner getScanner() { + return s; + } + + public long getCallSeq() { + return callSeq; + } + + public void incrCallSeq() { + callSeq++; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java index b079f2e..c990178 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.List; @@ -81,7 +82,9 @@ public class JVMClusterUtil { throws IOException { HRegionServer server; try { - server = hrsc.getConstructor(Configuration.class).newInstance(c); + Constructor ctor = hrsc.getConstructor(Configuration.class); + ctor.setAccessible(true); + server = ctor.newInstance(c); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException(); throw new RuntimeException("Failed construction of RegionServer: " + http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9faabf/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java b/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java new file mode 100644 index 0000000..38b558c --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test the scenario where a next() call, while scanning, timeout at client side and getting retried. + * This scenario should not result in some data being skipped at RS side. + */ +@Category(MediumTests.class) +public class TestClientScannerRPCTimeout { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + private static final int SLAVES = 1; + private static final int rpcTimeout = 5 * 1000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScannerNextRPCTimesout() throws Exception { + byte[] TABLE = Bytes.toBytes("testScannerNextRPCTimesout"); + HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); + putToTable(ht, "row-1"); + putToTable(ht, "row-2"); + RegionServerWithScanTimeout.seqNoToSleepOn = 1; + Scan scan = new Scan(); + scan.setCaching(1); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertNotNull("Expected not null result", result); + result = scanner.next(); + assertNotNull("Expected not null result", result); + scanner.close(); + } + + private void putToTable(HTable ht, String rowkey) throws IOException { + Put put = new Put(rowkey.getBytes()); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { + private long tableScannerId; + private boolean slept; + private static long seqNoToSleepOn = -1; + + public RegionServerWithScanTimeout(Configuration conf) throws IOException, + InterruptedException { + super(conf); + } + + @Override + public long openScanner(byte[] regionName, Scan scan) throws IOException { + long scannerId = super.openScanner(regionName, scan); + if (!getRegionInfo(regionName).isMetaTable()) { + tableScannerId = scannerId; + } + return scannerId; + } + + @Override + public Result[] next(long scannerId, int nbRows, long callSeq) throws IOException { + if (!slept && this.tableScannerId == scannerId && seqNoToSleepOn == callSeq) { + try { + Thread.sleep(rpcTimeout + 500); + } catch (InterruptedException e) { + } + slept = true; + } + return super.next(scannerId, nbRows, callSeq); + } + } +}