Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6E3E8F12B for ; Wed, 24 Apr 2013 18:18:45 +0000 (UTC) Received: (qmail 24220 invoked by uid 500); 24 Apr 2013 18:18:45 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 24146 invoked by uid 500); 24 Apr 2013 18:18:45 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 24130 invoked by uid 99); 24 Apr 2013 18:18:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Apr 2013 18:18:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Apr 2013 18:18:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id AFC06238899C; Wed, 24 Apr 2013 18:18:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1471580 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver: HRegionServer.java RegionScanner.java Date: Wed, 24 Apr 2013 18:18:19 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130424181819.AFC06238899C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Wed Apr 24 18:18:18 2013 New Revision: 1471580 URL: http://svn.apache.org/r1471580 Log: [HBASE-8114][89-fb] Interrupt the RS RPC call when the client connection has been closed Author: adela Summary: my changes were overwritten by a rebase that I did and I figured out that the logic is not in trunk, fixing it Test Plan: mr unit tests finished successfully. Since this is hard to test with a unit test I was testing with the following scenario: - Instead of testing whether the client has closed the connection I put a very small timeout ~100ms. - I start a scan over a table. - The scan times out because 100ms expired. Reviewers: manukranthk, liyintang Reviewed By: manukranthk CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D783731 Task ID: 2044405 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1471580&r1=1471579&r2=1471580&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Apr 24 18:18:18 2013 @@ -627,7 +627,11 @@ public class HRegionServer implements HR * @return */ public static boolean isCurrentConnectionClosed() { - return callContext.get().getConnection().getSocket().isClosed(); + // this is checked just because of the unit tests + if (callContext.get() != null) { + return callContext.get().getConnection().getSocket().isClosed(); + } + return false; } /** Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1471580&r1=1471579&r2=1471580&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Wed Apr 24 18:18:18 2013 @@ -30,6 +30,9 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; @@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.util.Byte */ public class RegionScanner implements InternalScanner { //Package local for testability + public static final Log LOG = LogFactory.getLog(RegionScanner.class); KeyValueHeap storeHeap = null; private final byte [] stopRow; private Filter filter; @@ -197,7 +201,7 @@ public class RegionScanner implements In getOriginalScan().setCurrentPartialResponseSize(0); int maxResponseSize = getOriginalScan().getMaxResponseSize(); do { - moreRows = nextInternal(tmpList, limit, metric, null); + moreRows = nextInternal(tmpList, limit, metric, null, true); if (!tmpList.isEmpty()) { currentNbRows++; if (outResults != null) { @@ -301,10 +305,10 @@ public class RegionScanner implements In if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. - returnResult = nextInternal(outResults, limit, metric, kvContext); + returnResult = nextInternal(outResults, limit, metric, kvContext, false); } else { List tmpList = new ArrayList(); - returnResult = nextInternal(tmpList, limit, metric, kvContext); + returnResult = nextInternal(tmpList, limit, metric, kvContext, false); outResults.addAll(tmpList); } rowReadCnt.incrementAndGet(); @@ -340,7 +344,7 @@ public class RegionScanner implements In * @param results empty list in which results will be stored */ private boolean nextInternal(List results, int limit, String metric, - KeyValueContext kvContext) + KeyValueContext kvContext, boolean prefetch) throws IOException { if (!results.isEmpty()) { @@ -350,6 +354,11 @@ public class RegionScanner implements In boolean partialRow = getOriginalScan().isPartialRow(); long maxResponseSize = getOriginalScan().getMaxResponseSize(); while (true) { + if (!prefetch && HRegionServer.isCurrentConnectionClosed()) { + HRegion.incrNumericMetric(HConstants.SERVER_INTERRUPTED_CALLS_KEY, 1); + LOG.error(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG); + throw new IOException(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG); + } byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { if (filter != null && filter.hasFilterRow()) { @@ -366,6 +375,12 @@ public class RegionScanner implements In } else { byte [] nextRow; do { + if (!prefetch && HRegionServer.isCurrentConnectionClosed()) { + HRegion.incrNumericMetric(HConstants.SERVER_INTERRUPTED_CALLS_KEY, + 1); + LOG.error(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG); + throw new IOException(HConstants.CLIENT_SOCKED_CLOSED_EXC_MSG); + } this.storeHeap.next(results, limit - results.size(), metric, kvContext); if (limit > 0 && results.size() == limit) { if (this.filter != null && filter.hasFilterRow())