helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-445] NPE in ZkPathDataDumpTask, rb=21504
Date Thu, 22 May 2014 01:26:57 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 8d5c27c13 -> 590f65cf9


[HELIX-445] NPE in ZkPathDataDumpTask, rb=21504


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/590f65cf
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/590f65cf
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/590f65cf

Branch: refs/heads/helix-0.6.x
Commit: 590f65cf9538435c89dfd71922479b281a159e60
Parents: 8d5c27c
Author: zzhang <zzhang@apache.org>
Authored: Wed May 21 18:26:40 2014 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Wed May 21 18:26:40 2014 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |   5 +
 .../main/java/org/apache/helix/PropertyKey.java |  27 +++
 .../apache/helix/manager/zk/ZKHelixManager.java |  12 +-
 .../helix/monitoring/ZKPathDataDumpTask.java    | 173 ++++++++++---------
 .../integration/TestHelixCustomCodeRunner.java  |  41 ++---
 .../helix/integration/TestSchedulerMessage.java |   2 +-
 .../manager/MockParticipantManager.java         |   1 -
 .../helix/mock/participant/MockJobIntf.java     |  28 ---
 .../monitoring/TestParticipantMonitor.java      |   6 +-
 .../helix/monitoring/TestStatCollector.java     |   6 +-
 .../monitoring/TestZKPathDataDumpTask.java      | 113 ++++++++++++
 pom.xml                                         |   5 +
 12 files changed, 264 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index 09e5a22..67b2897 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -125,6 +125,11 @@ under the License.
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>
       <version>2.1</version>

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 84ff9f2..f4e47a0 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -409,6 +409,15 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link StatusUpdate} of an instance
+     * @param instanceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey stateTransitionStatus(String instanceName) {
+      return new PropertyKey(STATUSUPDATES, StatusUpdate.class, _clusterName, instanceName);
+    }
+
+    /**
      * Used to get status update for a NON STATE TRANSITION type
      * @param instanceName
      * @param sessionId
@@ -452,6 +461,16 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link Error} of an instance, session, and
+     * resource
+     * @param instanceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey stateTransitionErrors(String instanceName) {
+      return new PropertyKey(ERRORS, Error.class, _clusterName, instanceName);
+    }
+
+    /**
      * Used to get status update for a NON STATE TRANSITION type
      * @param instanceName
      * @param sessionId
@@ -526,6 +545,14 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link StatusUpdate} of controller status updates
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey controllerTaskStatuses() {
+      return new PropertyKey(STATUSUPDATES_CONTROLLER, StatusUpdate.class, _clusterName);
+    }
+
+    /**
      * Get a property key associated with all {@link Message}s for the controller
      * @return {@link PropertyKey}
      */

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index a24ec32..01bbaf3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -131,11 +131,9 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
    */
   static class StatusDumpTask extends HelixTimerTask {
     Timer _timer = null;
-    final ZkClient zkclient;
     final HelixManager helixController;
 
-    public StatusDumpTask(ZkClient zkclient, HelixManager helixController) {
-      this.zkclient = zkclient;
+    public StatusDumpTask(HelixManager helixController) {
       this.helixController = helixController;
     }
 
@@ -148,8 +146,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
       if (_timer == null) {
         LOG.info("Start StatusDumpTask");
         _timer = new Timer("StatusDumpTimerTask", true);
-        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
-            timeThresholdNoChange), initialDelay, period);
+        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, timeThresholdNoChange),
+            initialDelay, period);
       }
     }
 
@@ -216,12 +214,12 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
       break;
     case CONTROLLER:
       _stateMachineEngine = null;
-      _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+      _controllerTimerTasks.add(new StatusDumpTask(this));
 
       break;
     case CONTROLLER_PARTICIPANT:
       _stateMachineEngine = new HelixStateMachineEngine(this);
-      _controllerTimerTasks.add(new StatusDumpTask(_zkclient, this));
+      _controllerTimerTasks.add(new StatusDumpTask(this));
       break;
     case ADMINISTRATOR:
     case SPECTATOR:

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
index c62da62..a0190d2 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java
@@ -19,36 +19,35 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.io.StringWriter;
-import java.util.Date;
 import java.util.List;
 import java.util.TimerTask;
 
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
+
+import com.google.common.collect.Lists;
 
 public class ZKPathDataDumpTask extends TimerTask {
-  static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
+  static Logger LOG = Logger.getLogger(ZKPathDataDumpTask.class);
 
   private final int _thresholdNoChangeInMs;
   private final HelixManager _manager;
-  private final ZkClient _zkClient;
+  private final ZNRecordSerializer _jsonSerializer;
+
+  public ZKPathDataDumpTask(HelixManager manager, int thresholdNoChangeInMs) {
+    LOG.info("Init ZKPathDataDumpTask for cluster: " + manager.getClusterName()
+        + ", thresholdNoChangeInMs: " + thresholdNoChangeInMs);
 
-  public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs)
{
     _manager = manager;
-    _zkClient = zkClient;
-    logger.info("Scanning cluster statusUpdate " + manager.getClusterName()
-        + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
+    _jsonSerializer = new ZNRecordSerializer();
     _thresholdNoChangeInMs = thresholdNoChangeInMs;
   }
 
@@ -59,88 +58,96 @@ public class ZKPathDataDumpTask extends TimerTask {
     // We need to think if we should create per-instance log files that contains
     // per-instance statusUpdates
     // and errors
-    logger.info("Scanning status updates ...");
-    try {
-      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
-      for (String instanceName : instances) {
-        scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
-            PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
-        scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
-            PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
-      }
-      scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
-          PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
+    LOG.info("Scan statusUpdates and errors for cluster: " + _manager.getClusterName()
+        + ", by controller: " + _manager);
+    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+    BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+    List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
+    for (String instance : instances) {
+      // dump participant status updates
+      String statusUpdatePath =
+          HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
+              PropertyType.STATUSUPDATES);
+      dump(baseAccessor, statusUpdatePath, _thresholdNoChangeInMs);
 
-      scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
-          PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
-    } catch (Exception e) {
-      logger.error(e);
+      // dump participant errors
+      String errorPath =
+          HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instance,
+              PropertyType.ERRORS);
+      dump(baseAccessor, errorPath, _thresholdNoChangeInMs * 3);
     }
+    // dump controller status updates
+    String controllerStatusUpdatePath =
+        HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+            PropertyType.STATUSUPDATES_CONTROLLER);
+    dump(baseAccessor, controllerStatusUpdatePath, _thresholdNoChangeInMs);
+
+    // dump controller errors
+    String controllerErrorPath =
+        HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
+            PropertyType.ERRORS_CONTROLLER);
+    dump(baseAccessor, controllerErrorPath, _thresholdNoChangeInMs);
   }
 
-  void scanPath(String path, int thresholdNoChangeInMs) {
-    logger.info("Scanning path " + path);
-    List<String> subPaths = _zkClient.getChildren(path);
-    for (String subPath : subPaths) {
-      try {
-        String nextPath = path + "/" + subPath;
-        List<String> subSubPaths = _zkClient.getChildren(nextPath);
-        for (String subsubPath : subSubPaths) {
-          try {
-            checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
-          } catch (Exception e) {
-            logger.error(e);
-          }
-        }
-      } catch (Exception e) {
-        logger.error(e);
+  /**
+   * Find paths of all leaf nodes under an ancestor path (exclusive)
+   * @param accessor
+   * @param ancestorPath
+   * @return a list of paths
+   */
+  static List<String> scanPath(BaseDataAccessor<ZNRecord> accessor, String ancestorPath)
{
+    List<String> queue = Lists.newLinkedList();
+    queue.add(ancestorPath);
+
+    // BFS
+    List<String> leafPaths = Lists.newArrayList();
+    while (!queue.isEmpty()) {
+      String path = queue.remove(0);
+      List<String> childNames = accessor.getChildNames(path, 0);
+      if (childNames == null) {
+        // path doesn't exist
+        continue;
+      }
+      if (childNames.isEmpty() && !path.equals(ancestorPath)) {
+        // leaf node, excluding ancestorPath
+        leafPaths.add(path);
+      }
+      for (String childName : childNames) {
+        String subPath = String.format("%s/%s", path, childName);
+        queue.add(subPath);
       }
     }
+    return leafPaths;
   }
 
-  void checkAndDump(String path, int thresholdNoChangeInMs) {
-    List<String> subPaths = _zkClient.getChildren(path);
-    if (subPaths.size() == 0) {
-      subPaths.add("");
+  void dump(BaseDataAccessor<ZNRecord> accessor, String ancestorPath, int threshold)
{
+    List<String> leafPaths = scanPath(accessor, ancestorPath);
+    if (leafPaths.isEmpty()) {
+      return;
+    }
+
+    Stat[] stats = accessor.getStats(leafPaths, 0);
+    List<String> dumpPaths = Lists.newArrayList();
+    long now = System.currentTimeMillis();
+    for (int i = 0; i < stats.length; i++) {
+      Stat stat = stats[i];
+      if ((now - stat.getMtime()) > threshold) {
+        dumpPaths.add(leafPaths.get(i));
+      }
     }
-    for (String subPath : subPaths) {
-      String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
-      Stat pathStat = _zkClient.getStat(fullPath);
-
-      long lastModifiedTimeInMs = pathStat.getMtime();
-      long nowInMs = new Date().getTime();
-      // logger.info(nowInMs + " " + lastModifiedTimeInMs + " " + fullPath);
-
-      // Check the last modified time
-      if (nowInMs > lastModifiedTimeInMs) {
-        long timeDiff = nowInMs - lastModifiedTimeInMs;
-        if (timeDiff > thresholdNoChangeInMs) {
-          logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has
passed");
-          _zkClient.setZkSerializer(new ZNRecordSerializer());
-          ZNRecord record = _zkClient.readData(fullPath);
-
-          // dump the node content into log file
-          ObjectMapper mapper = new ObjectMapper();
-          SerializationConfig serializationConfig = mapper.getSerializationConfig();
-          serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
-
-          StringWriter sw = new StringWriter();
-          try {
-            mapper.writeValue(sw, record);
-            logger.info(sw.toString());
-          } catch (Exception e) {
-            logger
-                .warn(
-                    "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This
can mostly be ignored",
-                    e);
-          }
-          // Delete the leaf data
-          _zkClient.deleteRecursive(fullPath);
-        }
+
+    // dump
+    LOG.info("Dump statusUpdates and errors records for pahts: " + dumpPaths);
+    List<ZNRecord> dumpRecords = accessor.get(dumpPaths, null, 0);
+    for (ZNRecord record : dumpRecords) {
+      if (record != null) {
+        LOG.info(new String(_jsonSerializer.serialize(record)));
       }
     }
+
+    // clean up
+    accessor.remove(dumpPaths, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
index f6a7098..93ceedb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestHelixCustomCodeRunner.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
@@ -32,7 +33,6 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.mock.participant.MockJobIntf;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.participant.CustomCodeCallbackHandler;
 import org.apache.helix.participant.HelixCustomCodeRunner;
@@ -62,30 +62,20 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
 
   }
 
-  class MockJob implements MockJobIntf {
-    @Override
-    public void doPreConnectJob(HelixManager manager) {
-      try {
-        // delay the start of the 1st participant
-        // so there will be a leadership transfer from localhost_12919 to 12918
-        if (manager.getInstanceName().equals("localhost_12918")) {
-          Thread.sleep(2000);
-        }
-
-        HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
-        customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
-            .usingLeaderStandbyModel("TestParticLeader").start();
-      } catch (Exception e) {
-        LOG.error("Exception do pre-connect job", e);
+  private void registerCustomCodeRunner(HelixManager manager) {
+    try {
+      // delay the start of the 1st participant
+      // so there will be a leadership transfer from localhost_12919 to 12918
+      if (manager.getInstanceName().equals("localhost_12918")) {
+        Thread.sleep(2000);
       }
-    }
-
-    @Override
-    public void doPostConnectJob(HelixManager manager) {
-      // TODO Auto-generated method stub
 
+      HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
+      customCodeRunner.invoke(_callback).on(ChangeType.LIVE_INSTANCE)
+          .usingLeaderStandbyModel("TestParticLeader").start();
+    } catch (Exception e) {
+      LOG.error("Exception do pre-connect job", e);
     }
-
   }
 
   @Test
@@ -109,10 +99,9 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
     for (int i = 0; i < _nodeNb; i++) {
       String instanceName = "localhost_" + (_startPort + i);
 
-      MockJob job = new MockJob();
       participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
 
-      job.doPreConnectJob(participants[i]);
+      registerCustomCodeRunner(participants[i]);
       participants[i].syncStart();
     }
     boolean result =
@@ -125,9 +114,7 @@ public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase {
     _callback._isCallbackInvoked = false;
 
     // add a new live instance
-    // ZkClient zkClient = new ZkClient(ZK_ADDR);
-    // zkClient.setZkSerializer(new ZNRecordSerializer());
-    ZKHelixDataAccessor accessor =
+    HelixDataAccessor accessor =
         new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
index 7cd64d7..cb6e186 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java
@@ -393,7 +393,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase {
       }
     }
     Thread.sleep(3000);
-    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _gZkClient, 0);
+    ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, 0);
     dumpTask.run();
 
     subPaths = _gZkClient.getChildren(controllerStatusPath);

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 34efe34..917be17 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -28,7 +28,6 @@ import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
-import org.apache.helix.mock.participant.MockJobIntf;
 import org.apache.helix.mock.participant.MockMSModelFactory;
 import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
deleted file mode 100644
index 4b637a6..0000000
--- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockJobIntf.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.helix.mock.participant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixManager;
-
-public interface MockJobIntf {
-  public void doPreConnectJob(HelixManager manager);
-
-  public void doPostConnectJob(HelixManager manager);
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
index 74b1a89..5f44b36 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestParticipantMonitor.java
@@ -96,10 +96,8 @@ public class TestParticipantMonitor {
     }
   }
 
-  @Test(groups = {
-    "unitTest"
-  })
-  public void TestReportData() throws InstanceNotFoundException, MalformedObjectNameException,
+  @Test()
+  public void testReportData() throws InstanceNotFoundException, MalformedObjectNameException,
       NullPointerException, IOException, InterruptedException {
     System.out.println("START TestParticipantMonitor");
     ParticipantMonitor monitor = new ParticipantMonitor();

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
index c35c961..7a4a941 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestStatCollector.java
@@ -23,10 +23,8 @@ import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
 public class TestStatCollector {
-  @Test(groups = {
-    "unitTest"
-  })
-  public void TestCollectData() {
+  @Test()
+  public void testCollectData() {
     StatCollector collector = new StatCollector();
 
     int nPoints = 100;

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
new file mode 100644
index 0000000..a3d8ae3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestZKPathDataDumpTask.java
@@ -0,0 +1,113 @@
+package org.apache.helix.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.StatusUpdate;
+import org.apache.helix.model.Error;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestZKPathDataDumpTask extends ZkUnitTestBase {
+
+  @Test
+  public void test() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 1;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        n, // number of nodes
+        1, // replicas
+        "MasterSlave", true); // do rebalance
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();
+
+    HelixManager manager = mock(HelixManager.class);
+    when(manager.getHelixDataAccessor()).thenReturn(accessor);
+    when(manager.getClusterName()).thenReturn(clusterName);
+
+    // run dump task without statusUpdates and errors, should not remove any existing statusUpdate/error
paths
+    ZKPathDataDumpTask task = new ZKPathDataDumpTask(manager, 0);
+    task.run();
+    PropertyKey controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+    Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    PropertyKey controllerErrorKey = keyBuilder.controllerTaskErrors();
+    Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    PropertyKey statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+    Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    PropertyKey errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+
+    // add participant status updates and errors
+    statusUpdateKey =
+        keyBuilder.stateTransitionStatus("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+    accessor.setProperty(statusUpdateKey, new StatusUpdate(new ZNRecord("statusUpdate")));
+    errorKey =
+        keyBuilder.stateTransitionError("localhost_12918", "session_0", "TestDB0", "TestDB0_0");
+    accessor.setProperty(errorKey, new Error(new ZNRecord("error")));
+
+    // add controller status updates and errors
+    controllerStatusUpdateKey = keyBuilder.controllerTaskStatus("session_0", "TestDB");
+    accessor.setProperty(controllerStatusUpdateKey, new StatusUpdate(new ZNRecord("controllerStatusUpdate")));
+    controllerErrorKey = keyBuilder.controllerTaskError("TestDB_error");
+    accessor.setProperty(controllerErrorKey, new Error(new ZNRecord("controllerError")));
+
+    // run dump task, should remove existing statusUpdate/error paths
+    task.run();
+    Assert.assertFalse(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    Assert.assertFalse(baseAccessor.exists(errorKey.getPath(), 0));
+
+    controllerStatusUpdateKey = keyBuilder.controllerTaskStatuses();
+    Assert.assertTrue(baseAccessor.exists(controllerStatusUpdateKey.getPath(), 0));
+    controllerErrorKey = keyBuilder.controllerTaskErrors();
+    Assert.assertTrue(baseAccessor.exists(controllerErrorKey.getPath(), 0));
+    statusUpdateKey = keyBuilder.stateTransitionStatus("localhost_12918");
+    Assert.assertTrue(baseAccessor.exists(statusUpdateKey.getPath(), 0));
+    errorKey = keyBuilder.stateTransitionErrors("localhost_12918");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/590f65cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e737957..1f6369d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -284,6 +284,11 @@ under the License.
         <artifactId>testng</artifactId>
         <version>6.0.1</version>
       </dependency>
+      <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-all</artifactId>
+        <version>1.9.5</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 


Mime
View raw message