Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0422710B5A for ; Wed, 16 Jul 2014 19:58:14 +0000 (UTC) Received: (qmail 40737 invoked by uid 500); 16 Jul 2014 19:58:14 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 40657 invoked by uid 500); 16 Jul 2014 19:58:13 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 40643 invoked by uid 99); 16 Jul 2014 19:58:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jul 2014 19:58:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A33B2996F5F; Wed, 16 Jul 2014 19:58:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Date: Wed, 16 Jul 2014 19:58:14 -0000 Message-Id: In-Reply-To: <2b1296e4b6cf4494a871d7aa4726e8df@git.apache.org> References: <2b1296e4b6cf4494a871d7aa4726e8df@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: [HELIX-472] Errors should be cleaned up less frequently [HELIX-472] Errors should be cleaned up less frequently Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f952cb76 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f952cb76 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f952cb76 Branch: refs/heads/master Commit: f952cb7606f6de91dbba4f5e4597e45259ec5bf6 Parents: 8f90279 Author: Kanak Biscuitwala Authored: Tue Jul 15 11:33:43 2014 -0700 Committer: Kanak Biscuitwala Committed: Wed Jul 16 12:55:31 2014 -0700 ---------------------------------------------------------------------- .../apache/helix/manager/zk/ZKHelixManager.java | 9 +- .../helix/monitoring/ZKPathDataDumpTask.java | 29 ++++-- .../helix/integration/TestSchedulerMessage.java | 2 +- .../monitoring/TestZKPathDataDumpTask.java | 99 ++++++++++++++++++-- 4 files changed, 118 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f952cb76/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 8d28dbd..f95f6ee 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -139,13 +139,16 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { public void start() { long initialDelay = 0; long period = 15 * 60 * 1000; - int timeThresholdNoChange = 15 * 60 * 1000; + long timeThresholdNoChangeForStatusUpdates = 15 * 60 * 1000; // 15 minutes + long timeThresholdNoChangeForErrors = 24 * 60 * 60 * 1000; // 1 day + int maximumNumberOfLeafNodesAllowed = 10000; if (_timer == null) { LOG.info("Start StatusDumpTask"); _timer = new Timer("StatusDumpTimerTask", true); - _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, timeThresholdNoChange), - initialDelay, period); + _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, + timeThresholdNoChangeForStatusUpdates, timeThresholdNoChangeForErrors, + maximumNumberOfLeafNodesAllowed), initialDelay, period); } } http://git-wip-us.apache.org/repos/asf/helix/blob/f952cb76/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java index 2f5f773..0a91256 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java @@ -38,17 +38,24 @@ import com.google.common.collect.Lists; public class ZKPathDataDumpTask extends TimerTask { static Logger LOG = Logger.getLogger(ZKPathDataDumpTask.class); - private final int _thresholdNoChangeInMs; + private final long _thresholdNoChangeMsForStatusUpdates; + private final long _thresholdNoChangeMsForErrors; + private final int _maxLeafCount; private final HelixManager _manager; private final ZNRecordSerializer _jsonSerializer; - public ZKPathDataDumpTask(HelixManager manager, int thresholdNoChangeInMs) { + public ZKPathDataDumpTask(HelixManager manager, long thresholdNoChangeMsForStatusUpdates, + long thresholdNoChangeMsForErrors, int maxLeafCount) { LOG.info("Init ZKPathDataDumpTask for cluster: " + manager.getClusterName() - + ", thresholdNoChangeInMs: " + thresholdNoChangeInMs); + + ", thresholdNoChangeMsForStatusUpdates: " + thresholdNoChangeMsForStatusUpdates + + ", thresholdNoChangeMsForErrors: " + thresholdNoChangeMsForErrors + ", maxLeafCount: " + + maxLeafCount); _manager = manager; _jsonSerializer = new ZNRecordSerializer(); - _thresholdNoChangeInMs = thresholdNoChangeInMs; + _thresholdNoChangeMsForStatusUpdates = thresholdNoChangeMsForStatusUpdates; + _thresholdNoChangeMsForErrors = thresholdNoChangeMsForErrors; + _maxLeafCount = maxLeafCount; } @Override @@ -70,25 +77,26 @@ public class ZKPathDataDumpTask extends TimerTask { String statusUpdatePath = HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance, PropertyType.STATUSUPDATES); - dump(baseAccessor, statusUpdatePath, _thresholdNoChangeInMs); + dump(baseAccessor, statusUpdatePath, _thresholdNoChangeMsForStatusUpdates, _maxLeafCount); // dump participant errors String errorPath = HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance, PropertyType.ERRORS); - dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 96); + dump(baseAccessor, errorPath, _thresholdNoChangeMsForErrors, _maxLeafCount); } // dump controller status updates String controllerStatusUpdatePath = HelixUtil.getControllerPropertyPath(_manager.getClusterName(), PropertyType.STATUSUPDATES_CONTROLLER); - dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeInMs); + dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeMsForStatusUpdates, + _maxLeafCount); // dump controller errors String controllerErrorPath = HelixUtil.getControllerPropertyPath(_manager.getClusterName(), PropertyType.ERRORS_CONTROLLER); - dump(baseAccessor, controllerErrorPath, _thresholdNoChangeInMs); + dump(baseAccessor, controllerErrorPath, _thresholdNoChangeMsForErrors, _maxLeafCount); } /** @@ -122,7 +130,8 @@ public class ZKPathDataDumpTask extends TimerTask { return leafPaths; } - void dump(BaseDataAccessor accessor, String ancestorPath, int threshold) { + void dump(BaseDataAccessor accessor, String ancestorPath, long threshold, + int maxLeafCount) { List leafPaths = scanPath(accessor, ancestorPath); if (leafPaths.isEmpty()) { return; @@ -133,7 +142,7 @@ public class ZKPathDataDumpTask extends TimerTask { long now = System.currentTimeMillis(); for (int i = 0; i < stats.length; i++) { Stat stat = stats[i]; - if ((now - stat.getMtime()) > threshold) { + if ((stats.length > maxLeafCount) || ((now - stat.getMtime()) > threshold)) { dumpPaths.add(leafPaths.get(i)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/f952cb76/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java index 623db80..80797bb 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java @@ -397,7 +397,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase { } } Thread.sleep(3000); - ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0); + ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0L, 0L, Integer.MAX_VALUE); dumpTask.run(); subPaths = _gZkClient.getChildren(controllerStatusPath); http://git-wip-us.apache.org/repos/asf/helix/blob/f952cb76/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java index a3d8ae3..d073dd2 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java @@ -19,6 +19,9 @@ package org.apache.helix.monitoring; * under the License. */ +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Date; import org.apache.helix.BaseDataAccessor; @@ -30,14 +33,11 @@ import org.apache.helix.ZNRecord; import org.apache.helix.ZkUnitTestBase; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.model.StatusUpdate; import org.apache.helix.model.Error; +import org.apache.helix.model.StatusUpdate; import org.testng.Assert; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class TestZKPathDataDumpTask extends ZkUnitTestBase { @Test @@ -67,8 +67,9 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase { when(manager.getHelixDataAccessor()).thenReturn(accessor); when(manager.getClusterName()).thenReturn(clusterName); - // run dump task without statusUpdates and errors, should not remove any existing statusUpdate/error paths - ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0); + // run dump task without statusUpdates and errors, should not remove any existing + // statusUpdate/error paths + ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0L, 0L, Integer.MAX_VALUE); task.run(); PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses(); Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0)); @@ -88,7 +89,8 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase { // add controller status updates and errors controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB"); - accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord("controllerStatusUpdate"))); + accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord( + "controllerStatusUpdate"))); controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error"); accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError"))); @@ -110,4 +112,87 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase { System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } + + @Test + public void testCapacityReached() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + int n = 1; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 2, // partitions per resource + n, // number of nodes + 1, // replicas + "MasterSlave", true); // do rebalance + + HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + BaseDataAccessor baseAccessor = accessor.getBaseDataAccessor(); + + HelixManager manager = mock(HelixManager.class); + when(manager.getHelixDataAccessor()).thenReturn(accessor); + when(manager.getClusterName()).thenReturn(clusterName); + + // run dump task without statusUpdates and errors, should not remove any existing + // statusUpdate/error paths + ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, Long.MAX_VALUE, Long.MAX_VALUE, 1); + task.run(); + PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses(); + Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0)); + PropertyKey controllerErrorKey = keyBuilder.controllerTaskErrors(); + Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0)); + PropertyKey statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918"); + Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0)); + PropertyKey errorKey = keyBuilder.stateTransitionErrors("localhost_12918"); + Assert.assertTrue(baseAccessor.exists(errorKey.getPath(), 0)); + + // add participant status updates and errors + statusUpdateKey = + keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_0"); + accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate"))); + errorKey = + keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_0"); + accessor.setProperty(errorKey, new Error(new ZNRecord("error"))); + + // add controller status updates and errors (one of each, should not trigger anything) + controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB"); + accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord( + "controllerStatusUpdate"))); + controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error"); + accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError"))); + + // run dump task, should not remove anything because the threshold is not exceeded + task.run(); + Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0)); + Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0)); + Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0)); + Assert.assertTrue(baseAccessor.exists(errorKey.getPath(), 0)); + + // add a second set of all status updates and errors + statusUpdateKey = + keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_1"); + accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate"))); + errorKey = + keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_1"); + accessor.setProperty(errorKey, new Error(new ZNRecord("error"))); + controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB1"); + accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord( + "controllerStatusUpdate"))); + controllerErrorKey = keyBuilder.controllerTaskError("TestDB1_error"); + accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError"))); + + // run dump task, should remove everything since capacities are exceeded + task.run(); + Assert.assertFalse(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0)); + Assert.assertFalse(baseAccessor.exists(controllerErrorKey.getPath(), 0)); + Assert.assertFalse(baseAccessor.exists(statusUpdateKey.getPath(), 0)); + Assert.assertFalse(baseAccessor.exists(errorKey.getPath(), 0)); + } }