Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E02F2104AB for ; Wed, 6 Nov 2013 19:18:45 +0000 (UTC) Received: (qmail 51664 invoked by uid 500); 6 Nov 2013 19:18:45 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 51638 invoked by uid 500); 6 Nov 2013 19:18:45 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 51631 invoked by uid 99); 6 Nov 2013 19:18:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Nov 2013 19:18:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Nov 2013 19:18:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B70DC2388994; Wed, 6 Nov 2013 19:18:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1539433 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/client/ Date: Wed, 06 Nov 2013 19:18:23 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131106191823.B70DC2388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Wed Nov 6 19:18:23 2013 New Revision: 1539433 URL: http://svn.apache.org/r1539433 Log: [master] Solve skipping data in HTable scans Author: maxim Summary: The HTable client cannot retry a scan operation in the getRegionServerWithRetries code path. This will result in the client missing data. This can be worked around using hbase.client.retries.number to 1. The whole problem is that Callable knows nothing about retries and the protocol it dances to as well doesn't support retires. This fix will keep Callable protocol (ugly thing worth merciless refactoring) intact but will change ScannerCallable to anticipate retries. What we want is to make failed operations to be identities for outside world: N1 * N2 * F3 * N3 * F4 * F4 * N4 ... = N1 * N2 * N3 * N4 ... where Nk are successful operation and Fk are failed operations. Test Plan: 1/ Enusure that new tests in TestScanRetries are working Reviewers: manukranthk, liyintang, aaiyer Reviewed By: manukranthk CC: sfodor, aaiyer, jingweil, junfang, liyintang, peiyue, rishid, rshroff, san Differential Revision: https://phabricator.fb.com/D1027350 Task ID: 2147851 Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1539433&r1=1539432&r2=1539433&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Wed Nov 6 19:18:23 2013 @@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.RemoteExcep import org.mortbay.log.Log; import java.io.IOException; +import java.util.Arrays; /** @@ -40,6 +41,9 @@ public class ScannerCallable extends Ser private boolean closed = false; private Scan scan; private int caching = 1; + private boolean skipFirstRow = false; + private boolean forceReopen = false; + private byte[] lastRowSeen = null; /** * @param connection which connection @@ -58,33 +62,78 @@ public class ScannerCallable extends Ser public Result [] call() throws IOException { if (scannerId != -1L && closed) { close(); - } else if (scannerId == -1L && !closed) { + } else if (scannerId == -1L && !closed && !forceReopen) { this.scannerId = openScanner(); } else { + ensureScannerIsOpened(); Result [] rrs = null; try { - rrs = server.next(scannerId, caching); + rrs = next(); } catch (IOException e) { - IOException ioe = null; - if (e instanceof RemoteException) { - ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); - } - if (ioe == null) throw new IOException(e); - if (ioe instanceof NotServingRegionException) { - // Throw a DNRE so that we break out of cycle of calling NSRE - // when what we need is to open scanner against new location. - // Attach NSRE to signal client that it needs to resetup scanner. - throw new DoNotRetryIOException("Reset scanner", ioe); - } else { - // The outer layers will retry - throw ioe; - } + fixStateOrCancelRetryThanRethrow(e); } return rrs; } return null; } + private void fixStateOrCancelRetryThanRethrow(IOException e) throws IOException { + IOException ioe = null; + if (e instanceof RemoteException) { + ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + if (ioe == null) { + ioe = new IOException(e); + } + if (ioe instanceof NotServingRegionException) { + // Throw a DNRE so that we break out of cycle of calling NSRE + // when what we need is to open scanner against new location. + // Attach NSRE to signal client that it needs to resetup scanner. + throw new DoNotRetryIOException("Reset scanner", ioe); + } else { + // The outer layers will retry + fixStateForFutureRetries(); + throw ioe; + } + } + + private Result[] next() throws IOException { + Result[] results = server.next(scannerId, caching); + if (results != null && results.length > 0) { + byte[] lastRow = results[results.length - 1].getRow(); + if (lastRow.length > 0) { + lastRowSeen = lastRow; + } + if (skipFirstRow) { + skipFirstRow = false; + // We can't return empty results as it will prematurely signal end of region + if (results.length > 1) { + results = Arrays.copyOfRange(results, 1, results.length); + } else { + return next(); + } + } + } + return results; + } + + private void fixStateForFutureRetries() { + Log.info("Fixing scan state for future retries"); + close(); + if (lastRowSeen != null && lastRowSeen.length > 0) { + scan.setStartRow(lastRowSeen); + skipFirstRow = true; + } + forceReopen = true; + } + + private void ensureScannerIsOpened() throws IOException { + if (forceReopen) { + scannerId = openScanner(); + forceReopen = false; + } + } + private void close() { if (this.scannerId == -1L) { return; Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1539433&r1=1539432&r2=1539433&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Wed Nov 6 19:18:23 2013 @@ -363,6 +363,11 @@ public class HBaseTestingUtility { return startMiniCluster(1, numSlaves); } + public MiniHBaseCluster startMiniCluster(final int numMasters, + final int numSlaves) throws IOException, InterruptedException { + return startMiniCluster(numMasters, numSlaves, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); + } + /** * Start up a minicluster of hbase, optionally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random @@ -380,7 +385,7 @@ public class HBaseTestingUtility { * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, - final int numSlaves) throws IOException, InterruptedException { + final int numSlaves, final Class regionServerClass) throws IOException, InterruptedException { LOG.info("Starting up minicluster"); // If we already put up a cluster, fail. if (miniClusterRunning) { @@ -413,7 +418,7 @@ public class HBaseTestingUtility { this.conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString()); fs.mkdirs(hbaseRootdir); FSUtils.setVersion(fs, hbaseRootdir); - startMiniHBaseCluster(numMasters, numSlaves); + startMiniHBaseCluster(numMasters, numSlaves, regionServerClass); // Don't leave here till we've done a successful scan of the .META. HTable t = null; @@ -439,9 +444,16 @@ public class HBaseTestingUtility { return this.hbaseCluster; } + public void startMiniHBaseCluster(final int numMasters, final int numSlaves) - throws IOException, InterruptedException { - this.hbaseCluster = new MiniHBaseCluster(this.conf, numMasters, numSlaves); + throws IOException, InterruptedException { + startMiniHBaseCluster(numMasters, numSlaves, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); + } + + public void startMiniHBaseCluster(final int numMasters, final int numSlaves, + Class regionServerClass) + throws IOException, InterruptedException { + this.hbaseCluster = new MiniHBaseCluster(this.conf, numMasters, numSlaves, regionServerClass); } /** Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1539433&r1=1539432&r2=1539433&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Wed Nov 6 19:18:23 2013 @@ -30,18 +30,18 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.hbase.util.HasThread; /** * This class creates a single process HBase cluster. @@ -87,6 +87,11 @@ public class MiniHBaseCluster { public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers) throws IOException, InterruptedException { + this(conf, numMasters, numRegionServers, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); + } + + public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, Class regionServerClass) + throws IOException, InterruptedException { this.conf = conf; MiniHBaseCluster.numClusters ++; conf.set(HConstants.MASTER_PORT, "0"); @@ -94,7 +99,7 @@ public class MiniHBaseCluster { PREFERRED_ASSIGNMENT); conf.setLong("hbase.master.holdRegionForBestLocality.period", PREFERRED_ASSIGNMENT / 5); - init(numMasters, numRegionServers); + init(numMasters, numRegionServers, regionServerClass); } /** @@ -252,13 +257,13 @@ public class MiniHBaseCluster { } } - private void init(final int nMasterNodes, final int nRegionNodes) + private void init(final int nMasterNodes, final int nRegionNodes, Class regionServerClass) throws IOException { try { // start up a LocalHBaseCluster hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, nRegionNodes, MiniHBaseCluster.MiniHBaseClusterMaster.class, - MiniHBaseCluster.MiniHBaseClusterRegionServer.class); + regionServerClass); hbaseCluster.startup(); } catch(IOException e) { shutdown(); Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java?rev=1539433&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java Wed Nov 6 19:18:23 2013 @@ -0,0 +1,149 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ParamFormat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test various scanner timeout issues. + */ +public class TestScanRetries { + + private final static HBaseTestingUtility + TEST_UTIL = new HBaseTestingUtility(); + + final Log LOG = LogFactory.getLog(getClass()); + private final static byte[] SOME_BYTES = Bytes.toBytes("f"); + private final static byte[] TABLE_NAME = Bytes.toBytes("t"); + private final static int NB_ROWS = 6; + private final static int SCANNER_TIMEOUT = 100000; + private static HTable table; + private static boolean enableFailure = false; + + public static class TestRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { + private long nextCallCount = 0l; + + public TestRegionServer(Configuration conf) + throws IOException { + super(conf); + } + + @ParamFormat(clazz = ScanParamsFormatter.class) + @Override + public Result[] next(final long scannerId, int nbRows) throws IOException { + ++nextCallCount; + LOG.info("nextCallCount: " + String.valueOf(nextCallCount)); + if (enableFailure && nextCallCount % 5 == 0) { + super.next(scannerId, nbRows); + LOG.info("Something bad happened on the way from server to client. Should force retry!"); + throw new IOException("Forcing retry"); + } + return super.next(scannerId, nbRows); + } + } + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt("hbase.regionserver.lease.period", SCANNER_TIMEOUT); + conf.setInt("hbase.client.retries.number", 5); + TEST_UTIL.startMiniCluster(1, 2, TestRegionServer.class); + table = TEST_UTIL.createTable(Bytes.toBytes("t"), SOME_BYTES); + for (int i = 0; i < NB_ROWS; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(SOME_BYTES, SOME_BYTES, SOME_BYTES); + table.put(put); + } + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + TEST_UTIL.ensureSomeRegionServersAvailable(2); + } + + /** + * Tests that we have right number of rows in scan + * @throws Exception + */ + @Test + public void testNumberOfRowsInScanWithoutRetries() throws Exception { + enableFailure = false; + doTestNumberOfRowsInScan(); + } + + /** + * Tests that we have right number of rows in scan even with retries + * @throws Exception + */ + @Test + public void testNumberOfRowsInScanWithRetries() throws Exception { + enableFailure = true; + doTestNumberOfRowsInScan(); + } + + public void doTestNumberOfRowsInScan() throws Exception { + Scan scan = new Scan(); + ResultScanner r = table.getScanner(scan); + int count = 0; + try { + Result res = r.next(); + while (res != null) { + count++; + res = r.next(); + } + } catch (Throwable e) { + LOG.error("Got exception " + e.getMessage(), e); + fail("Exception while counting rows!"); + } + r.close(); + LOG.info("Number of rows read: " + String.valueOf(count)); + assertEquals(NB_ROWS, count); + } +} +