helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-345] Speed up the controller pipeline, rb=16407
Date Thu, 30 Jan 2014 00:44:05 GMT
Updated Branches:
  refs/heads/helix-0.6.2-release c92428023 -> 06ca975d1


[HELIX-345] Speed up the controller pipeline, rb=16407


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/06ca975d
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/06ca975d
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/06ca975d

Branch: refs/heads/helix-0.6.2-release
Commit: 06ca975d11a2fbea9c91390e7c3419bcd2d6f704
Parents: c924280
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Wed Jan 29 16:43:07 2014 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Jan 29 16:43:07 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  40 +++++
 .../controller/stages/ClusterDataCache.java     | 171 +++++++++++++++++--
 .../controller/stages/ReadClusterDataStage.java |  12 +-
 .../controller/stages/TaskAssignmentStage.java  |   5 +
 .../src/test/java/org/apache/helix/Mocks.java   |   2 +-
 .../stages/TestRebalancePipeline.java           |  21 +--
 .../TestReelectedPipelineCorrectness.java       | 151 ++++++++++++++++
 7 files changed, 370 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7e28399..6db82fc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -46,6 +46,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventBlockingQueue;
 import org.apache.helix.controller.stages.CompatibilityCheckStage;
@@ -117,6 +118,11 @@ public class GenericHelixController implements ConfigChangeListener,
IdealStateC
   int _timerPeriod = Integer.MAX_VALUE;
 
   /**
+   * A cache maintained across pipelines
+   */
+  private ClusterDataCache _cache;
+
+  /**
    * Default constructor that creates a default pipeline registry. This is sufficient in
    * most cases, but if there is a some thing specific needed use another constructor
    * where in you can pass a pipeline registry
@@ -134,6 +140,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
     @Override
     public void run() {
+      _cache.requireFullRefresh();
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.CALLBACK);
       ClusterEvent event = new ClusterEvent("periodicalRebalance");
@@ -237,6 +244,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
     _eventQueue = new ClusterEventBlockingQueue();
     _eventThread = new ClusterEventProcessor();
     _eventThread.start();
+    _cache = new ClusterDataCache();
   }
 
   /**
@@ -282,6 +290,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       }
     }
 
+    // add the cache
+    event.addAttribute("ClusterDataCache", _cache);
+
     List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
     if (pipelines == null || pipelines.size() == 0) {
       logger.info("No pipeline to run for event:" + event.getName());
@@ -324,6 +335,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onStateChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
     ClusterEvent event = new ClusterEvent("currentStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("instanceName", instanceName);
@@ -347,6 +361,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onMessage()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
 
     ClusterEvent event = new ClusterEvent("messageChange");
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -366,10 +383,15 @@ public class GenericHelixController implements ConfigChangeListener,
IdealStateC
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
     logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
 
     if (liveInstances == null) {
       liveInstances = Collections.emptyList();
     }
+    _cache.setLiveInstances(liveInstances);
+
     // Go though the live instance list and make sure that we are observing them
     // accordingly. The action is done regardless of the paused flag.
     if (changeContext.getType() == NotificationContext.Type.INIT
@@ -409,6 +431,14 @@ public class GenericHelixController implements ConfigChangeListener,
IdealStateC
   @Override
   public void onIdealStateChange(List<IdealState> idealStates, NotificationContext
changeContext) {
     logger.info("START: Generic GenericClusterController.onIdealStateChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
+
+    if (idealStates == null) {
+      idealStates = Collections.emptyList();
+    }
+    _cache.setIdealStates(idealStates);
     ClusterEvent event = new ClusterEvent("idealStateChange");
     event.addAttribute("helixmanager", changeContext.getManager());
     event.addAttribute("changeContext", changeContext);
@@ -425,6 +455,15 @@ public class GenericHelixController implements ConfigChangeListener,
IdealStateC
   @Override
   public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
{
     logger.info("START: GenericClusterController.onConfigChange()");
+    if (changeContext == null || changeContext.getType() != Type.CALLBACK) {
+      _cache.requireFullRefresh();
+    }
+
+    if (configs == null) {
+      configs = Collections.emptyList();
+    }
+    _cache.setInstanceConfigs(configs);
+
     ClusterEvent event = new ClusterEvent("configChange");
     event.addAttribute("changeContext", changeContext);
     event.addAttribute("helixmanager", changeContext.getManager());
@@ -436,6 +475,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   @Override
   public void onControllerChange(NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onControllerChange()");
+    _cache.requireFullRefresh();
     if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
       logger.info("GenericClusterController.onControllerChange() FINALIZE");
       return;

http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 5c0a94a..5d38151 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -22,12 +22,15 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixConstants.StateModelToken;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -39,7 +42,9 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * Reads the data from the cluster using data accessor. This output ClusterData which
@@ -58,6 +63,11 @@ public class ClusterDataCache {
   Map<String, Map<String, Message>> _messageMap;
   Map<String, Map<String, String>> _idealStateRuleMap;
 
+  // maintain a cache of participant messages across pipeline runs
+  Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+
+  boolean _init = true;
+
   // Map<String, Map<String, HealthStat>> _healthStatMap;
   // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
   // private PersistentStats _persistentStats;
@@ -72,38 +82,110 @@ public class ClusterDataCache {
    * @param accessor
    * @return
    */
-  public boolean refresh(HelixDataAccessor accessor) {
+  public synchronized boolean refresh(HelixDataAccessor accessor) {
+    LOG.info("START: ClusterDataCache.refresh()");
+    long startTime = System.currentTimeMillis();
+
     Builder keyBuilder = accessor.keyBuilder();
-    _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
-    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
 
-    for (LiveInstance instance : _liveInstanceMap.values()) {
-      LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
+    if (_init) {
+      _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+      _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+      _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+    }
+
+    if (LOG.isTraceEnabled()) {
+      for (LiveInstance instance : _liveInstanceMap.values()) {
+        LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
+      }
     }
 
     _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs());
-    _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
 
     Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String,
Message>>();
+    List<PropertyKey> newMessageKeys = Lists.newLinkedList();
+    long purgeSum = 0;
     for (String instanceName : _liveInstanceMap.keySet()) {
-      Map<String, Message> map = accessor.getChildValuesMap(keyBuilder.messages(instanceName));
-      msgMap.put(instanceName, map);
+      // get the cache
+      Map<String, Message> cachedMap = _messageCache.get(instanceName);
+      if (cachedMap == null) {
+        cachedMap = Maps.newHashMap();
+        _messageCache.put(instanceName, cachedMap);
+      }
+      msgMap.put(instanceName, cachedMap);
+
+      // get the current names
+      Set<String> messageNames =
+          Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName)));
+
+      long purgeStart = System.currentTimeMillis();
+      // clear stale names
+      Iterator<String> cachedNamesIter = cachedMap.keySet().iterator();
+      while (cachedNamesIter.hasNext()) {
+        String messageName = cachedNamesIter.next();
+        if (!messageNames.contains(messageName)) {
+          cachedNamesIter.remove();
+        }
+      }
+      long purgeEnd = System.currentTimeMillis();
+      purgeSum += purgeEnd - purgeStart;
+
+      // get the keys for the new messages
+      for (String messageName : messageNames) {
+        if (!cachedMap.containsKey(messageName)) {
+          newMessageKeys.add(keyBuilder.message(instanceName, messageName));
+        }
+      }
+    }
+
+    // get the new messages
+    if (newMessageKeys.size() > 0) {
+      List<Message> newMessages = accessor.getProperty(newMessageKeys);
+      for (Message message : newMessages) {
+        if (message != null) {
+          Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
+          cachedMap.put(message.getId(), message);
+        }
+      }
     }
     _messageMap = Collections.unmodifiableMap(msgMap);
+    LOG.debug("Purge took: " + purgeSum);
 
+    List<PropertyKey> currentStateKeys = Lists.newLinkedList();
     Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap
=
         new HashMap<String, Map<String, Map<String, CurrentState>>>();
     for (String instanceName : _liveInstanceMap.keySet()) {
       LiveInstance liveInstance = _liveInstanceMap.get(instanceName);
       String sessionId = liveInstance.getSessionId();
-      if (!allCurStateMap.containsKey(instanceName)) {
-        allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>());
+      List<String> currentStateNames =
+          accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
+      for (String currentStateName : currentStateNames) {
+        currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
+      }
+
+      // ensure an empty current state map for all live instances and sessions
+      Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(instanceName);
+      if (instanceCurStateMap == null) {
+        instanceCurStateMap = Maps.newHashMap();
+        allCurStateMap.put(instanceName, instanceCurStateMap);
+      }
+      Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
+      if (sessionCurStateMap == null) {
+        sessionCurStateMap = Maps.newHashMap();
+        instanceCurStateMap.put(sessionId, sessionCurStateMap);
+      }
+    }
+    List<CurrentState> currentStates = accessor.getProperty(currentStateKeys);
+    Iterator<PropertyKey> csKeyIter = currentStateKeys.iterator();
+    for (CurrentState currentState : currentStates) {
+      PropertyKey key = csKeyIter.next();
+      String[] params = key.getParams();
+      if (currentState != null && params.length >= 4) {
+        Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(params[1]);
+        Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(params[2]);
+        sessionCurStateMap.put(params[3], currentState);
       }
-      Map<String, Map<String, CurrentState>> curStateMap = allCurStateMap.get(instanceName);
-      Map<String, CurrentState> map =
-          accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId));
-      curStateMap.put(sessionId, map);
     }
 
     for (String instance : allCurStateMap.keySet()) {
@@ -130,6 +212,18 @@ public class ClusterDataCache {
       }
     }
 
+    long endTime = System.currentTimeMillis();
+    LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms");
+
+    if (LOG.isDebugEnabled()) {
+      int numPaths =
+          _liveInstanceMap.size() + _idealStateMap.size() + _stateModelDefMap.size()
+              + _instanceConfigMap.size() + _constraintMap.size() + newMessageKeys.size()
+              + currentStateKeys.size();
+      LOG.debug("Paths read: " + numPaths);
+    }
+
+    _init = false;
     return true;
   }
 
@@ -141,6 +235,14 @@ public class ClusterDataCache {
     return _idealStateMap;
   }
 
+  public synchronized void setIdealStates(List<IdealState> idealStates) {
+    Map<String, IdealState> idealStateMap = Maps.newHashMap();
+    for (IdealState idealState : idealStates) {
+      idealStateMap.put(idealState.getId(), idealState);
+    }
+    _idealStateMap = idealStateMap;
+  }
+
   public Map<String, Map<String, String>> getIdealStateRules() {
     return _idealStateRuleMap;
   }
@@ -153,6 +255,14 @@ public class ClusterDataCache {
     return _liveInstanceMap;
   }
 
+  public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
+    Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap();
+    for (LiveInstance liveInstance : liveInstances) {
+      liveInstanceMap.put(liveInstance.getId(), liveInstance);
+    }
+    _liveInstanceMap = liveInstanceMap;
+  }
+
   /**
    * Provides the current state of the node for a given session id,
    * the sessionid can be got from LiveInstance
@@ -161,6 +271,10 @@ public class ClusterDataCache {
    * @return
    */
   public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId)
{
+    if (!_currentStateMap.containsKey(instanceName)
+        || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) {
+      return Collections.emptyMap();
+    }
     return _currentStateMap.get(instanceName).get(clientSessionId);
   }
 
@@ -178,6 +292,20 @@ public class ClusterDataCache {
     }
   }
 
+  public void cacheMessages(List<Message> messages) {
+    for (Message message : messages) {
+      String instanceName = message.getTgtName();
+      Map<String, Message> instMsgMap = null;
+      if (_messageCache.containsKey(instanceName)) {
+        instMsgMap = _messageCache.get(instanceName);
+      } else {
+        instMsgMap = Maps.newHashMap();
+        _messageCache.put(instanceName, instMsgMap);
+      }
+      instMsgMap.put(message.getId(), message);
+    }
+  }
+
   // public HealthStat getGlobalStats()
   // {
   // return _globalStats;
@@ -236,6 +364,14 @@ public class ClusterDataCache {
     return _instanceConfigMap;
   }
 
+  public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs)
{
+    Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceConfigMap.put(instanceConfig.getId(), instanceConfig);
+    }
+    _instanceConfigMap = instanceConfigMap;
+  }
+
   /**
    * Some partitions might be disabled on specific nodes.
    * This method allows one to fetch the set of nodes where a given partition is disabled
@@ -295,6 +431,13 @@ public class ClusterDataCache {
   }
 
   /**
+   * Indicate that a full read should be done on the next refresh
+   */
+  public synchronized void requireFullRefresh() {
+    _init = true;
+  }
+
+  /**
    * toString method to print the entire cluster state
    */
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index f077d29..0a7414a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -29,11 +29,8 @@ import org.apache.log4j.Logger;
 
 public class ReadClusterDataStage extends AbstractBaseStage {
   private static final Logger logger = Logger.getLogger(ReadClusterDataStage.class.getName());
-  ClusterDataCache _cache;
 
-  public ReadClusterDataStage() {
-    _cache = new ClusterDataCache();
-  }
+  private ClusterDataCache _cache = null;
 
   @Override
   public void process(ClusterEvent event) throws Exception {
@@ -44,6 +41,13 @@ public class ReadClusterDataStage extends AbstractBaseStage {
     if (manager == null) {
       throw new StageException("HelixManager attribute value is null");
     }
+
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+    if (cache == null && _cache == null) {
+      cache = new ClusterDataCache();
+    }
+    _cache = cache;
+
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     _cache.refresh(dataAccessor);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 192a645..5772385 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -74,6 +74,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
             manager.getProperties());
     sendMessages(dataAccessor, outputMessages);
 
+    long cacheStart = System.currentTimeMillis();
+    cache.cacheMessages(outputMessages);
+    long cacheEnd = System.currentTimeMillis();
+    logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms");
+
     long endTime = System.currentTimeMillis();
     logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
 

http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index df893bb..9e2452b 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -603,7 +603,7 @@ public class Mocks {
           String[] keySplit = key.split("\\/");
           String[] pathSplit = path.split("\\/");
           if (keySplit.length > pathSplit.length) {
-            child.add(keySplit[pathSplit.length + 1]);
+            child.add(keySplit[pathSplit.length]);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 1a11be3..452a683 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -25,30 +25,20 @@ import java.util.List;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageSelectionStageOutput;
-import org.apache.helix.controller.stages.MessageThrottleStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.Partition;
 import org.apache.helix.model.Message.Attributes;
+import org.apache.helix.model.Partition;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -217,6 +207,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
+    ClusterDataCache cache = new ClusterDataCache();
+    event.addAttribute("ClusterDataCache", cache);
+
     final String resourceName = "testResource_pending";
     String[] resourceGroups = new String[] {
       resourceName
@@ -267,6 +260,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     // message, make sure controller should not send O->DROPPEDN until O->S is done
     HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
     admin.dropResource(clusterName, resourceName);
+    List<IdealState> idealStates = accessor.getChildValues(accessor.keyBuilder().idealStates());
+    cache.setIdealStates(idealStates);
 
     runPipeline(event, dataRefresh);
     runPipeline(event, rebalancePipeline);

http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
new file mode 100644
index 0000000..78927f9
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java
@@ -0,0 +1,151 @@
+package org.apache.helix.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.ClusterDistributedController;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * The controller pipeline will only update ideal states, live instances, and instance configs
+ * when the change. However, if a controller loses leadership and subsequently regains it,
we need
+ * to ensure that the controller can verify its cache. That's what this test is for.
+ */
+public class TestReelectedPipelineCorrectness extends ZkUnitTestBase {
+  @Test
+  public void testReelection() throws Exception {
+    final int NUM_CONTROLLERS = 2;
+    final int NUM_PARTICIPANTS = 4;
+    final int NUM_PARTITIONS = 8;
+    final int NUM_REPLICAS = 2;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance
+
+    // configure distributed controllers
+    String controllerCluster = clusterName + "_controllers";
+    setupTool.addCluster(controllerCluster, true);
+    for (int i = 0; i < NUM_CONTROLLERS; i++) {
+      setupTool.addInstanceToCluster(controllerCluster, "controller_" + i);
+    }
+    setupTool.activateCluster(clusterName, controllerCluster, true);
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+      final String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    // start controllers
+    ClusterDistributedController[] controllers = new ClusterDistributedController[NUM_CONTROLLERS];
+    for (int i = 0; i < NUM_CONTROLLERS; i++) {
+      controllers[i] =
+          new ClusterDistributedController(ZK_ADDR, controllerCluster, "controller_" + i);
+      controllers[i].syncStart();
+    }
+    Thread.sleep(1000);
+
+    // Ensure a balanced cluster
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Disable the leader, resulting in a leader election
+    HelixDataAccessor accessor = participants[0].getHelixDataAccessor();
+    LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
+    String leaderId = leader.getId();
+    String standbyId = (leaderId.equals("controller_0")) ? "controller_1" : "controller_0";
+    HelixAdmin admin = setupTool.getClusterManagementTool();
+    admin.enableInstance(controllerCluster, leaderId, false);
+
+    // Stop a participant to make sure that the leader election worked
+    Thread.sleep(500);
+    participants[0].syncStop();
+    Thread.sleep(500);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // Disable the original standby (leaving 0 active controllers) and kill another participant
+    admin.enableInstance(controllerCluster, standbyId, false);
+    Thread.sleep(500);
+    participants[1].syncStop();
+
+    // Also change the ideal state
+    IdealState idealState = admin.getResourceIdealState(clusterName, "TestDB0");
+    idealState.setMaxPartitionsPerInstance(1);
+    admin.setResourceIdealState(clusterName, "TestDB0", idealState);
+    Thread.sleep(500);
+
+    // Also disable an instance in the main cluster
+    admin.enableInstance(clusterName, "localhost_12920", false);
+
+    // Re-enable the original leader
+    admin.enableInstance(controllerCluster, leaderId, true);
+
+    // Now check that both the ideal state and the live instances are adhered to by the rebalance
+    Thread.sleep(500);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+            clusterName));
+    Assert.assertTrue(result);
+
+    // cleanup
+    for (int i = 0; i < NUM_CONTROLLERS; i++) {
+      controllers[i].syncStop();
+    }
+    for (int i = 2; i < NUM_PARTICIPANTS; i++) {
+      participants[i].syncStop();
+    }
+
+    System.out.println("STOP " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message