helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-472] Errors should be cleaned up less frequently
Date Wed, 16 Jul 2014 18:30:07 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release 521893217 -> dbdfd52a0


[HELIX-472] Errors should be cleaned up less frequently


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dbdfd52a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dbdfd52a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dbdfd52a

Branch: refs/heads/helix-0.6.2-release
Commit: dbdfd52a0d41bbb355df10db77fad47571aa85a5
Parents: 5218932
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Jul 15 11:33:43 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Tue Jul 15 14:08:06 2014 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |  9 +-
 .../helix/monitoring/ZKPathDataDumpTask.java    | 31 +++---
 .../helix/integration/TestSchedulerMessage.java |  2 +-
 .../monitoring/TestZKPathDataDumpTask.java      | 99 ++++++++++++++++++--
 4 files changed, 119 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/dbdfd52a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index d92cc7d..f0b5693 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -146,13 +146,16 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
     public void start() {
       long initialDelay = 0;
       long period = 15 * 60 * 1000;
-      int timeThresholdNoChange = 15 * 60 * 1000;
+      long timeThresholdNoChangeForStatusUpdates = 15 * 60 * 1000; // 15 minutes
+      long timeThresholdNoChangeForErrors = 24 * 60 * 60 * 1000; // 1 day
+      int maximumNumberOfLeafNodesAllowed = 10000;
 
       if (_timer == null) {
         LOG.info("Start StatusDumpTask");
         _timer = new Timer("StatusDumpTimerTask", true);
-        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, timeThresholdNoChange),
-            initialDelay, period);
+        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController,
+            timeThresholdNoChangeForStatusUpdates, timeThresholdNoChangeForErrors,
+            maximumNumberOfLeafNodesAllowed), initialDelay, period);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/dbdfd52a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
index a0190d2..0a91256 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -25,9 +25,9 @@ import java.util.TimerTask;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
@@ -38,17 +38,24 @@ import com.google.common.collect.Lists;
 public class ZKPathDataDumpTask extends TimerTask {
   static Logger LOG = Logger.getLogger(ZKPathDataDumpTask.class);
 
-  private final int _thresholdNoChangeInMs;
+  private final long _thresholdNoChangeMsForStatusUpdates;
+  private final long _thresholdNoChangeMsForErrors;
+  private final int _maxLeafCount;
   private final HelixManager _manager;
   private final ZNRecordSerializer _jsonSerializer;
 
-  public ZKPathDataDumpTask(HelixManager manager, int thresholdNoChangeInMs) {
+  public ZKPathDataDumpTask(HelixManager manager, long thresholdNoChangeMsForStatusUpdates,
+      long thresholdNoChangeMsForErrors, int maxLeafCount) {
     LOG.info("Init ZKPathDataDumpTask for cluster: " + manager.getClusterName()
-        + ", thresholdNoChangeInMs: " + thresholdNoChangeInMs);
+        + ", thresholdNoChangeMsForStatusUpdates: " + thresholdNoChangeMsForStatusUpdates
+        + ", thresholdNoChangeMsForErrors: " + thresholdNoChangeMsForErrors + ", maxLeafCount:
"
+        + maxLeafCount);
 
     _manager = manager;
     _jsonSerializer = new ZNRecordSerializer();
-    _thresholdNoChangeInMs = thresholdNoChangeInMs;
+    _thresholdNoChangeMsForStatusUpdates = thresholdNoChangeMsForStatusUpdates;
+    _thresholdNoChangeMsForErrors = thresholdNoChangeMsForErrors;
+    _maxLeafCount = maxLeafCount;
   }
 
   @Override
@@ -70,25 +77,26 @@ public class ZKPathDataDumpTask extends TimerTask {
       String statusUpdatePath =
           HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
               PropertyType.STATUSUPDATES);
-      dump(baseAccessor, statusUpdatePath, _thresholdNoChangeInMs);
+      dump(baseAccessor, statusUpdatePath, _thresholdNoChangeMsForStatusUpdates, _maxLeafCount);
 
       // dump participant errors
       String errorPath =
           HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
               PropertyType.ERRORS);
-      dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 3);
+      dump(baseAccessor, errorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
     }
     // dump controller status updates
     String controllerStatusUpdatePath =
         HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
             PropertyType.STATUSUPDATES_CONTROLLER);
-    dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeInMs);
+    dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeMsForStatusUpdates,
+        _maxLeafCount);
 
     // dump controller errors
     String controllerErrorPath =
         HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
             PropertyType.ERRORS_CONTROLLER);
-    dump(baseAccessor, controllerErrorPath, _thresholdNoChangeInMs);
+    dump(baseAccessor, controllerErrorPath, _thresholdNoChangeMsForErrors, _maxLeafCount);
   }
 
   /**
@@ -122,7 +130,8 @@ public class ZKPathDataDumpTask extends TimerTask {
     return leafPaths;
   }
 
-  void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, int threshold)
{
+  void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, long threshold,
+      int maxLeafCount) {
     List<String> leafPaths = scanPath(accessor, ancestorPath);
     if (leafPaths.isEmpty()) {
       return;
@@ -133,7 +142,7 @@ public class ZKPathDataDumpTask extends TimerTask {
     long now = System.currentTimeMillis();
     for (int i = 0; i < stats.length; i++) {
       Stat stat = stats[i];
-      if ((now - stat.getMtime()) > threshold) {
+      if ((stats.length > maxLeafCount) || ((now - stat.getMtime()) > threshold)) {
         dumpPaths.add(leafPaths.get(i));
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/dbdfd52a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 281b306..202ae64 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -393,7 +393,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServ
       }
     }
     Thread.sleep(3000);
-    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0);
+    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0L, 0L, Integer.MAX_VALUE);
     dumpTask.run();
 
     subPaths = _gZkClient.getChildren(controllerStatusPath);

http://git-wip-us.apache.org/repos/asf/helix/blob/dbdfd52a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
index a3d8ae3..d073dd2 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.util.Date;
 
 import org.apache.helix.BaseDataAccessor;
@@ -30,14 +33,11 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.model.Error;
+import org.apache.helix.model.StatusUpdate;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class TestZKPathDataDumpTask extends ZkUnitTestBase {
 
   @Test
@@ -67,8 +67,9 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase {
     when(manager.getHelixDataAccessor()).thenReturn(accessor);
     when(manager.getClusterName()).thenReturn(clusterName);
 
-    // run dump task without statusUpdates and errors, should not remove any existing statusUpdate/error
paths
-    ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0);
+    // run dump task without statusUpdates and errors, should not remove any existing
+    // statusUpdate/error paths
+    ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0L, 0L, Integer.MAX_VALUE);
     task.run();
     PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
     Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
@@ -88,7 +89,8 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase {
 
     // add controller status updates and errors
     controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB");
-    accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord("controllerStatusUpdate")));
+    accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord(
+        "controllerStatusUpdate")));
     controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error");
     accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
 
@@ -110,4 +112,87 @@ public class TestZKPathDataDumpTask extends ZkUnitTestBase {
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
   }
+
+  @Test
+  public void testCapacityReached() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 1;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        n, // number of nodes
+        1, // replicas
+        "MasterSlave", true); // do rebalance
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+    HelixManager manager = mock(HelixManager.class);
+    when(manager.getHelixDataAccessor()).thenReturn(accessor);
+    when(manager.getClusterName()).thenReturn(clusterName);
+
+    // run dump task without statusUpdates and errors, should not remove any existing
+    // statusUpdate/error paths
+    ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, Long.MAX_VALUE, Long.MAX_VALUE,
1);
+    task.run();
+    PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+    Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    PropertyKey controllerErrorKey = keyBuilder.controllerTaskErrors();
+    Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    PropertyKey statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+    Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    PropertyKey errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+    Assert.assertTrue(baseAccessor.exists(errorKey.getPath(), 0));
+
+    // add participant status updates and errors
+    statusUpdateKey =
+        keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+    accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate")));
+    errorKey =
+        keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+    accessor.setProperty(errorKey, new Error(new ZNRecord("error")));
+
+    // add controller status updates and errors (one of each, should not trigger anything)
+    controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB");
+    accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord(
+        "controllerStatusUpdate")));
+    controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error");
+    accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
+
+    // run dump task, should not remove anything because the threshold is not exceeded
+    task.run();
+    Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    Assert.assertTrue(baseAccessor.exists(errorKey.getPath(), 0));
+
+    // add a second set of all status updates and errors
+    statusUpdateKey =
+        keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_1");
+    accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate")));
+    errorKey =
+        keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_1");
+    accessor.setProperty(errorKey, new Error(new ZNRecord("error")));
+    controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB1");
+    accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord(
+        "controllerStatusUpdate")));
+    controllerErrorKey = keyBuilder.controllerTaskError("TestDB1_error");
+    accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
+
+    // run dump task, should remove everything since capacities are exceeded
+    task.run();
+    Assert.assertFalse(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(errorKey.getPath(), 0));
+  }
 }


Mime
View raw message