Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F3F5B200B8D for ; Fri, 9 Sep 2016 00:35:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F28E0160AD0; Thu, 8 Sep 2016 22:35:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA736160AAD for ; Fri, 9 Sep 2016 00:35:56 +0200 (CEST) Received: (qmail 10397 invoked by uid 500); 8 Sep 2016 22:35:51 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 10230 invoked by uid 99); 8 Sep 2016 22:35:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 22:35:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5DD4E2F11; Thu, 8 Sep 2016 22:35:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@phoenix.apache.org Date: Thu, 08 Sep 2016 22:35:54 -0000 Message-Id: In-Reply-To: <0a588a7e17ec4ddab1b85ac432631530@git.apache.org> References: <0a588a7e17ec4ddab1b85ac432631530@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/7] phoenix git commit: PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner archived-at: Thu, 08 Sep 2016 22:35:58 -0000 PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b8540e34 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b8540e34 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b8540e34 Branch: refs/heads/4.8-HBase-1.1 Commit: b8540e34403f2fed71dbe1795dbcea10751b2a8a Parents: 25a5550 Author: Josh Elser Authored: Mon Jul 18 16:24:22 2016 -0400 Committer: Josh Elser Committed: Thu Sep 8 18:04:40 2016 -0400 ---------------------------------------------------------------------- .../phoenix/schema/stats/StatisticsScanner.java | 72 ++++++++-- .../schema/stats/StatisticsScannerTest.java | 144 +++++++++++++++++++ 2 files changed, 202 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8540e34/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index 082e833..736efc6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -49,12 +50,14 @@ public class StatisticsScanner implements InternalScanner { private StatisticsCollector tracker; private ImmutableBytesPtr family; private final Configuration config; + private final RegionServerServices regionServerServices; public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env, InternalScanner delegate, ImmutableBytesPtr family) { this.tracker = tracker; this.statsWriter = stats; this.delegate = delegate; + this.regionServerServices = env.getRegionServerServices(); this.region = env.getRegion(); this.family = family; this.config = env.getConfiguration(); @@ -89,9 +92,13 @@ public class StatisticsScanner implements InternalScanner { @Override public void close() throws IOException { - boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); - StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config); - StatisticsScannerCallable callable = new StatisticsScannerCallable(); + boolean async = getConfig().getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); + StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config); + StatisticsScannerCallable callable = createCallable(); + if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) { + LOG.debug("Not updating table statistics because the server is stopping/stopped"); + return; + } if (!async) { callable.call(); } else { @@ -99,12 +106,45 @@ public class StatisticsScanner implements InternalScanner { } } - private class StatisticsScannerCallable implements Callable { + // VisibleForTesting + StatisticsCollectionRunTracker getStatsCollectionRunTracker(Configuration c) { + return StatisticsCollectionRunTracker.getInstance(c); + } + + Configuration getConfig() { + return config; + } + + StatisticsWriter getStatisticsWriter() { + return statsWriter; + } + + RegionServerServices getRegionServerServices() { + return regionServerServices; + } + + Region getRegion() { + return region; + } + + StatisticsScannerCallable createCallable() { + return new StatisticsScannerCallable(); + } + + StatisticsCollector getTracker() { + return tracker; + } + + InternalScanner getDelegate() { + return delegate; + } + + class StatisticsScannerCallable implements Callable { @Override public Void call() throws IOException { IOException toThrow = null; - StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config); - final HRegionInfo regionInfo = region.getRegionInfo(); + StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config); + final HRegionInfo regionInfo = getRegion().getRegionInfo(); try { // update the statistics table // Just verify if this if fine @@ -114,32 +154,36 @@ public class StatisticsScanner implements InternalScanner { LOG.debug("Deleting the stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.deleteStats(region, tracker, family, mutations); + getStatisticsWriter().deleteStats(region, tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.addStats(tracker, family, mutations); + getStatisticsWriter().addStats(tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Committing new stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.commitStats(mutations, tracker); + getStatisticsWriter().commitStats(mutations, tracker); } catch (IOException e) { - LOG.error("Failed to update statistics table!", e); - toThrow = e; + if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) { + LOG.debug("Ignoring error updating statistics because region is closing/closed"); + } else { + LOG.error("Failed to update statistics table!", e); + toThrow = e; + } } finally { try { collectionTracker.removeCompactingRegion(regionInfo); - statsWriter.close();// close the writer - tracker.close();// close the tracker + getStatisticsWriter().close();// close the writer + getTracker().close();// close the tracker } catch (IOException e) { if (toThrow == null) toThrow = e; LOG.error("Error while closing the stats table", e); } finally { // close the delegate scanner try { - delegate.close(); + getDelegate().close(); } catch (IOException e) { if (toThrow == null) toThrow = e; LOG.error("Error while closing the scanner", e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8540e34/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java new file mode 100644 index 0000000..888f09a --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.schema.stats.StatisticsScanner.StatisticsScannerCallable; +import org.junit.Before; +import org.junit.Test; + +/** + * Test to verify that we don't try to update stats when a RS is stopping. + */ +public class StatisticsScannerTest { + + private Region region; + private RegionServerServices rsServices; + private StatisticsWriter statsWriter; + private StatisticsScannerCallable callable; + private StatisticsCollectionRunTracker runTracker; + private StatisticsScanner mockScanner; + private StatisticsCollector tracker; + private InternalScanner delegate; + private HRegionInfo regionInfo; + + private Configuration config; + + @Before + public void setupMocks() throws Exception { + this.config = new Configuration(false); + + // Create all of the mocks + this.region = mock(Region.class); + this.rsServices = mock(RegionServerServices.class); + this.statsWriter = mock(StatisticsWriter.class); + this.callable = mock(StatisticsScannerCallable.class); + this.runTracker = mock(StatisticsCollectionRunTracker.class); + this.mockScanner = mock(StatisticsScanner.class); + this.tracker = mock(StatisticsCollector.class); + this.delegate = mock(InternalScanner.class); + this.regionInfo = mock(HRegionInfo.class); + + // Wire up the mocks to the mock StatisticsScanner + when(mockScanner.getStatisticsWriter()).thenReturn(statsWriter); + when(mockScanner.getRegionServerServices()).thenReturn(rsServices); + when(mockScanner.createCallable()).thenReturn(callable); + when(mockScanner.getStatsCollectionRunTracker(any(Configuration.class))).thenReturn(runTracker); + when(mockScanner.getRegion()).thenReturn(region); + when(mockScanner.getConfig()).thenReturn(config); + when(mockScanner.getTracker()).thenReturn(tracker); + when(mockScanner.getDelegate()).thenReturn(delegate); + + // Wire up the HRegionInfo mock to the Region mock + when(region.getRegionInfo()).thenReturn(regionInfo); + + // Always call close() on the mock StatisticsScanner + doCallRealMethod().when(mockScanner).close(); + } + + @Test + public void testCheckRegionServerStoppingOnClose() throws Exception { + when(rsServices.isStopping()).thenReturn(true); + when(rsServices.isStopped()).thenReturn(false); + + mockScanner.close(); + + verify(rsServices).isStopping(); + verify(callable, never()).call(); + verify(runTracker, never()).runTask(callable); + } + + @Test + public void testCheckRegionServerStoppedOnClose() throws Exception { + when(rsServices.isStopping()).thenReturn(false); + when(rsServices.isStopped()).thenReturn(true); + + mockScanner.close(); + + verify(rsServices).isStopping(); + verify(rsServices).isStopped(); + verify(callable, never()).call(); + verify(runTracker, never()).runTask(callable); + } + + @SuppressWarnings("unchecked") + @Test + public void testCheckRegionServerStoppingOnException() throws Exception { + StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable(); + doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class), + any(ImmutableBytesPtr.class), any(List.class)); + when(rsServices.isStopping()).thenReturn(true); + when(rsServices.isStopped()).thenReturn(false); + + // Should not throw an exception + realCallable.call(); + + verify(rsServices).isStopping(); + } + + @SuppressWarnings("unchecked") + @Test + public void testCheckRegionServerStoppedOnException() throws Exception { + StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable(); + doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class), + any(ImmutableBytesPtr.class), any(List.class)); + when(rsServices.isStopping()).thenReturn(false); + when(rsServices.isStopped()).thenReturn(true); + + // Should not throw an exception + realCallable.call(); + + verify(rsServices).isStopping(); + verify(rsServices).isStopped(); + } +}