helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [07/10] [HELIX-279] Apply gc handling fixes to main ZKHelixManager class
Date Fri, 25 Oct 2013 01:21:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
deleted file mode 100644
index 4f549e4..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
+++ /dev/null
@@ -1,693 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.ConfigChangeListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.InstanceConfigChangeListener;
-import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.LiveInstanceInfoProvider;
-import org.apache.helix.MessageListener;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper.States;
-
-public abstract class AbstractManager implements HelixManager, IZkStateListener {
-  private static Logger LOG = Logger.getLogger(AbstractManager.class);
-
-  final String _zkAddress;
-  final String _clusterName;
-  final String _instanceName;
-  final InstanceType _instanceType;
-  final int _sessionTimeout;
-  final List<PreConnectCallback> _preConnectCallbacks;
-  protected final List<CallbackHandler> _handlers;
-  final HelixManagerProperties _properties;
-
-  /**
-   * helix version#
-   */
-  final String _version;
-
-  protected ZkClient _zkclient = null;
-  final DefaultMessagingService _messagingService;
-
-  BaseDataAccessor<ZNRecord> _baseDataAccessor;
-  ZKHelixDataAccessor _dataAccessor;
-  final Builder _keyBuilder;
-  ConfigAccessor _configAccessor;
-  ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
-  LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
-  final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
-
-  volatile String _sessionId;
-
-  /**
-   * Keep track of timestamps that zk State has become Disconnected
-   * If in a _timeWindowLengthMs window zk State has become Disconnected
-   * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
-   */
-  final List<Long> _disconnectTimeHistory = new LinkedList<Long>();
-
-  final int _flappingTimeWindowMs;
-  final int _maxDisconnectThreshold;
-
-  public AbstractManager(String zkAddress, String clusterName, String instanceName,
-      InstanceType instanceType) {
-
-    LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
-        + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
-
-    _zkAddress = zkAddress;
-    _clusterName = clusterName;
-    _instanceType = instanceType;
-    _instanceName = instanceName;
-    _preConnectCallbacks = new LinkedList<PreConnectCallback>();
-    _handlers = new ArrayList<CallbackHandler>();
-    _properties = new HelixManagerProperties("cluster-manager-version.properties");
-    _version = _properties.getVersion();
-
-    _keyBuilder = new Builder(clusterName);
-    _messagingService = new DefaultMessagingService(this);
-
-    /**
-     * use system property if available
-     */
-    _flappingTimeWindowMs =
-        getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
-            ZKHelixManager.FLAPPING_TIME_WINDIOW);
-
-    _maxDisconnectThreshold =
-        getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
-            ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
-
-    _sessionTimeout =
-        getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
-
-  }
-
-  private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
-    String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
-
-    try {
-      int value = Integer.parseInt(valueString);
-      if (value > 0) {
-        return value;
-      }
-    } catch (NumberFormatException e) {
-      LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
-          + ", using default value: " + propertyDefaultValue);
-    }
-
-    return propertyDefaultValue;
-  }
-
-  /**
-   * different types of helix manager should impl its own handle new session logic
-   */
-  // public abstract void handleNewSession();
-
-  @Override
-  public void connect() throws Exception {
-    LOG.info("ClusterManager.connect()");
-    if (isConnected()) {
-      LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
-          + " already connected. skip connect");
-      return;
-    }
-
-    try {
-      createClient();
-      _messagingService.onConnected();
-    } catch (Exception e) {
-      LOG.error("fail to connect " + _instanceName, e);
-      disconnect();
-      throw e;
-    }
-  }
-
-  @Override
-  public boolean isConnected() {
-    if (_zkclient == null) {
-      return false;
-    }
-    ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection();
-    if (zkconnection != null) {
-      States state = zkconnection.getZookeeperState();
-      return state == States.CONNECTED;
-    }
-    return false;
-  }
-
-  /**
-   * specific disconnect logic for each helix-manager type
-   */
-  abstract void doDisconnect();
-
-  /**
-   * This function can be called when the connection are in bad state(e.g. flapping),
-   * in which isConnected() could be false and we want to disconnect from cluster.
-   */
-  @Override
-  public void disconnect() {
-    LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
-
-    try {
-      /**
-       * stop all timer tasks
-       */
-      stopTimerTasks();
-
-      /**
-       * shutdown thread pool first to avoid reset() being invoked in the middle of state
-       * transition
-       */
-      _messagingService.getExecutor().shutdown();
-
-      // TODO reset user defined handlers only
-      resetHandlers();
-
-      _dataAccessor.shutdown();
-
-      doDisconnect();
-
-      _zkclient.unsubscribeAll();
-    } finally {
-      _zkclient.close();
-      LOG.info("Cluster manager: " + _instanceName + " disconnected");
-    }
-  }
-
-  @Override
-  public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
-        new EventType[] {
-            EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
-        new EventType[] {
-            EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted,
-            EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
-        new EventType[] {
-          EventType.NodeChildrenChanged
-        });
-  }
-
-  @Override
-  public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
-      throws Exception {
-    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
-        new EventType[] {
-          EventType.NodeChildrenChanged
-        });
-  }
-
-  @Override
-  public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
-      throws Exception {
-    Builder keyBuilder = new Builder(_clusterName);
-
-    PropertyKey propertyKey = null;
-    switch (scope) {
-    case CLUSTER:
-      propertyKey = keyBuilder.clusterConfigs();
-      break;
-    case PARTICIPANT:
-      propertyKey = keyBuilder.instanceConfigs();
-      break;
-    case RESOURCE:
-      propertyKey = keyBuilder.resourceConfigs();
-      break;
-    default:
-      break;
-    }
-
-    if (propertyKey != null) {
-      addListener(listener, propertyKey, ChangeType.CONFIG, new EventType[] {
-        EventType.NodeChildrenChanged
-      });
-    } else {
-      LOG.error("Can't add listener to config scope: " + scope);
-    }
-  }
-
-  @Override
-  public void addMessageListener(MessageListener listener, String instanceName) {
-    addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
-      String instanceName, String sessionId) throws Exception {
-    addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
-        ChangeType.CURRENT_STATE, new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
-      throws Exception {
-    addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addControllerListener(ControllerChangeListener listener) {
-    addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  void addControllerMessageListener(MessageListener listener) {
-    addListener(listener, new Builder(_clusterName).controllerMessages(),
-        ChangeType.MESSAGES_CONTROLLER, new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public boolean removeListener(PropertyKey key, Object listener) {
-    LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
-        + _clusterName + " by instance: " + _instanceName);
-
-    synchronized (this) {
-      List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
-      for (CallbackHandler handler : _handlers) {
-        // compare property-key path and listener reference
-        if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
-          toRemove.add(handler);
-        }
-      }
-
-      _handlers.removeAll(toRemove);
-
-      // handler.reset() may modify the handlers list, so do it outside the iteration
-      for (CallbackHandler handler : toRemove) {
-        handler.reset();
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public HelixDataAccessor getHelixDataAccessor() {
-    checkConnected();
-    return _dataAccessor;
-  }
-
-  @Override
-  public ConfigAccessor getConfigAccessor() {
-    checkConnected();
-    return _configAccessor;
-  }
-
-  @Override
-  public String getClusterName() {
-    return _clusterName;
-  }
-
-  @Override
-  public String getInstanceName() {
-    return _instanceName;
-  }
-
-  @Override
-  public String getSessionId() {
-    checkConnected();
-    return _sessionId;
-  }
-
-  @Override
-  public long getLastNotificationTime() {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-  @Override
-  public HelixAdmin getClusterManagmentTool() {
-    checkConnected();
-    if (_zkclient != null) {
-      return new ZKHelixAdmin(_zkclient);
-    }
-
-    LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
-    return null;
-  }
-
-  @Override
-  public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
-    checkConnected();
-
-    if (_helixPropertyStore == null) {
-      String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
-      _helixPropertyStore =
-          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
-              null);
-    }
-
-    return _helixPropertyStore;
-  }
-
-  @Override
-  public ClusterMessagingService getMessagingService() {
-    // The caller can register message handler factories on messaging service before the
-    // helix manager is connected. Thus we do not check connected here
-    return _messagingService;
-  }
-
-  @Override
-  public ParticipantHealthReportCollector getHealthReportCollector() {
-    // helix-participant will override this
-    return null;
-  }
-
-  @Override
-  public InstanceType getInstanceType() {
-    return _instanceType;
-  }
-
-  @Override
-  public String getVersion() {
-    return _version;
-  }
-
-  @Override
-  public HelixManagerProperties getProperties() {
-    return _properties;
-  }
-
-  @Override
-  public StateMachineEngine getStateMachineEngine() {
-    // helix-participant will override this
-    return null;
-  }
-
-  @Override
-  public abstract boolean isLeader();
-
-  @Override
-  public void startTimerTasks() {
-    for (HelixTimerTask task : _timerTasks) {
-      task.start();
-    }
-
-  }
-
-  @Override
-  public void stopTimerTasks() {
-    for (HelixTimerTask task : _timerTasks) {
-      task.stop();
-    }
-
-  }
-
-  @Override
-  public void addPreConnectCallback(PreConnectCallback callback) {
-    LOG.info("Adding preconnect callback: " + callback);
-    _preConnectCallbacks.add(callback);
-  }
-
-  @Override
-  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
-    _liveInstanceInfoProvider = liveInstanceInfoProvider;
-  }
-
-  /**
-   * wait until we get a non-zero session-id. note that we might lose zkconnection
-   * right after we read session-id. but it's ok to get stale session-id and we will have
-   * another handle-new-session callback to correct this.
-   */
-  protected void waitUntilConnected() {
-    boolean isConnected;
-    do {
-      isConnected =
-          _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-      if (!isConnected) {
-        LOG.error("fail to connect zkserver: " + _zkAddress + " in "
-            + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
-            + ", clusterName: " + _clusterName);
-        continue;
-      }
-
-      ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
-      _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
-
-      /**
-       * at the time we read session-id, zkconnection might be lost again
-       * wait until we get a non-zero session-id
-       */
-    } while ("0".equals(_sessionId));
-
-    LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
-        + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
-        + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
-  }
-
-  protected void checkConnected() {
-    if (!isConnected()) {
-      throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
-    }
-  }
-
-  protected void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
-      EventType[] eventType) {
-    checkConnected();
-
-    PropertyType type = propertyKey.getType();
-
-    synchronized (this) {
-      for (CallbackHandler handler : _handlers) {
-        // compare property-key path and listener reference
-        if (handler.getPath().equals(propertyKey.getPath())
-            && handler.getListener().equals(listener)) {
-          LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath()
-              + " already exists. skip add");
-
-          return;
-        }
-      }
-
-      CallbackHandler newHandler =
-          new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType);
-
-      _handlers.add(newHandler);
-      LOG.info("Added listener: " + listener + " for type: " + type + " to path: "
-          + newHandler.getPath());
-    }
-  }
-
-  protected void initHandlers(List<CallbackHandler> handlers) {
-    synchronized (this) {
-      if (handlers != null) {
-        for (CallbackHandler handler : handlers) {
-          handler.init();
-          LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
-        }
-      }
-    }
-  }
-
-  protected void resetHandlers() {
-    synchronized (this) {
-      if (_handlers != null) {
-        // get a copy of the list and iterate over the copy list
-        // in case handler.reset() modify the original handler list
-        List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
-        tmpHandlers.addAll(_handlers);
-
-        for (CallbackHandler handler : tmpHandlers) {
-          handler.reset();
-          LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
-        }
-      }
-    }
-  }
-
-  /**
-   * different helix-manager may override this to have a cache-enabled based-data-accessor
-   * @param baseDataAccessor
-   * @return
-   */
-  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
-    return baseDataAccessor;
-  }
-
-  void createClient() throws Exception {
-    PathBasedZkSerializer zkSerializer =
-        ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
-
-    _zkclient =
-        new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
-
-    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
-
-    _baseDataAccessor = createBaseDataAccessor(baseDataAccessor);
-
-    _dataAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
-    _configAccessor = new ConfigAccessor(_zkclient);
-
-    int retryCount = 0;
-
-    _zkclient.subscribeStateChanges(this);
-    while (retryCount < 3) {
-      try {
-        _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
-        handleStateChanged(KeeperState.SyncConnected);
-        handleNewSession();
-        break;
-      } catch (HelixException e) {
-        LOG.error("fail to createClient.", e);
-        throw e;
-      } catch (Exception e) {
-        retryCount++;
-
-        LOG.error("fail to createClient. retry " + retryCount, e);
-        if (retryCount == 3) {
-          throw e;
-        }
-      }
-    }
-  }
-
-  // TODO separate out flapping detection code
-  @Override
-  public void handleStateChanged(KeeperState state) throws Exception {
-    switch (state) {
-    case SyncConnected:
-      ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
-      LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
-      break;
-    case Disconnected:
-      LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId + ", instance: "
-          + _instanceName + ", type: " + _instanceType);
-
-      /**
-       * Track the time stamp that the disconnected happens, then check history and see if
-       * we should disconnect the helix-manager
-       */
-      _disconnectTimeHistory.add(System.currentTimeMillis());
-      if (isFlapping()) {
-        LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
-            + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
-            + _flappingTimeWindowMs + "ms.");
-        disconnect();
-      }
-      break;
-    case Expired:
-      LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: "
-          + _instanceName + ", type: " + _instanceType);
-      break;
-    default:
-      break;
-    }
-  }
-
-  /**
-   * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
-   * _timeWindowLengthMs Ms
-   * time window, we think that there are something wrong going on and disconnect the zkHelixManager
-   * from zk.
-   */
-  private boolean isFlapping() {
-    if (_disconnectTimeHistory.size() == 0) {
-      return false;
-    }
-    long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
-
-    // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
-    while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
-      _disconnectTimeHistory.remove(0);
-    }
-    return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
-  }
-
-  /**
-   * controller should override it to return a list of timers that need to start/stop when
-   * leadership changes
-   * @return
-   */
-  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
deleted file mode 100644
index 1ed6dea..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ /dev/null
@@ -1,175 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Timer;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.monitoring.ZKPathDataDumpTask;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class ControllerManager extends AbstractManager {
-  private static Logger LOG = Logger.getLogger(ControllerManager.class);
-
-  final GenericHelixController _controller = new GenericHelixController();
-
-  // TODO merge into GenericHelixController
-  private CallbackHandler _leaderElectionHandler = null;
-
-  /**
-   * status dump timer-task
-   */
-  static class StatusDumpTask extends HelixTimerTask {
-    Timer _timer = null;
-    final ZkClient zkclient;
-    final AbstractManager helixController;
-
-    public StatusDumpTask(ZkClient zkclient, AbstractManager helixController) {
-      this.zkclient = zkclient;
-      this.helixController = helixController;
-    }
-
-    @Override
-    public void start() {
-      long initialDelay = 30 * 60 * 1000;
-      long period = 120 * 60 * 1000;
-      int timeThresholdNoChange = 180 * 60 * 1000;
-
-      if (_timer == null) {
-        LOG.info("Start StatusDumpTask");
-        _timer = new Timer("StatusDumpTimerTask", true);
-        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
-            timeThresholdNoChange), initialDelay, period);
-      }
-
-    }
-
-    @Override
-    public void stop() {
-      if (_timer != null) {
-        LOG.info("Stop StatusDumpTask");
-        _timer.cancel();
-        _timer = null;
-      }
-    }
-  }
-
-  public ControllerManager(String zkAddress, String clusterName, String instanceName) {
-    super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER);
-
-    _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
-    _timerTasks.add(new StatusDumpTask(_zkclient, this));
-  }
-
-  @Override
-  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
-    return _timerTasks;
-  }
-
-  @Override
-  public void handleNewSession() throws Exception {
-    waitUntilConnected();
-
-    /**
-     * reset all handlers, make sure cleanup completed for previous session
-     * disconnect if fail to cleanup
-     */
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-    // TODO reset user defined handlers only
-    resetHandlers();
-
-    /**
-     * from here on, we are dealing with new session
-     */
-
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.init();
-    } else {
-      _leaderElectionHandler =
-          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
-              new DistributedLeaderElection(this, _controller), new EventType[] {
-                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-              }, ChangeType.CONTROLLER);
-    }
-
-    /**
-     * init handlers
-     * ok to init message handler and controller handlers twice
-     * the second init will be skipped (see CallbackHandler)
-     */
-    initHandlers(_handlers);
-  }
-
-  @Override
-  void doDisconnect() {
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-  }
-
-  @Override
-  public boolean isLeader() {
-    if (!isConnected()) {
-      return false;
-    }
-
-    try {
-      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
-      if (leader != null) {
-        String leaderName = leader.getInstanceName();
-        String sessionId = leader.getSessionId();
-        if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
-            && sessionId.equals(_sessionId)) {
-          return true;
-        }
-      }
-    } catch (Exception e) {
-      // log
-    }
-    return false;
-  }
-
-  /**
-   * helix-controller uses a write-through cache for external-view
-   */
-  @Override
-  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
-    String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName);
-    return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath));
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index ff3a264..d2b520b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -36,14 +36,14 @@ import org.apache.log4j.Logger;
 public class ControllerManagerHelper {
   private static Logger LOG = Logger.getLogger(ControllerManagerHelper.class);
 
-  final AbstractManager _manager;
+  final HelixManager _manager;
   final DefaultMessagingService _messagingService;
   final List<HelixTimerTask> _controllerTimerTasks;
 
-  public ControllerManagerHelper(AbstractManager manager) {
+  public ControllerManagerHelper(HelixManager manager, List<HelixTimerTask> controllerTimerTasks) {
     _manager = manager;
     _messagingService = (DefaultMessagingService) manager.getMessagingService();
-    _controllerTimerTasks = manager.getControllerHelixTimerTasks();
+    _controllerTimerTasks = controllerTimerTasks;
   }
 
   public void addListenersToController(GenericHelixController controller) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
deleted file mode 100644
index c9ad0f3..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class DistributedControllerManager extends AbstractManager {
-  private static Logger LOG = Logger.getLogger(DistributedControllerManager.class);
-
-  final StateMachineEngine _stateMachineEngine;
-  final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
-  CallbackHandler _leaderElectionHandler = null;
-  final GenericHelixController _controller = new GenericHelixController();
-
-  /**
-   * hold timer tasks for controller only
-   * we need to add/remove controller timer tasks during handle new session
-   */
-  final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>();
-
-  public DistributedControllerManager(String zkAddress, String clusterName, String instanceName) {
-    super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER_PARTICIPANT);
-
-    _stateMachineEngine = new HelixStateMachineEngine(this);
-    _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
-    _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
-
-    _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
-    _controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this));
-
-  }
-
-  @Override
-  public ParticipantHealthReportCollector getHealthReportCollector() {
-    checkConnected();
-    return _participantHealthInfoCollector;
-  }
-
-  @Override
-  public StateMachineEngine getStateMachineEngine() {
-    return _stateMachineEngine;
-  }
-
-  @Override
-  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
-    return _controllerTimerTasks;
-  }
-
-  @Override
-  public void handleNewSession() throws Exception {
-    waitUntilConnected();
-
-    ParticipantManagerHelper participantHelper =
-        new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
-
-    /**
-     * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
-     * disconnect if fail to cleanup
-     */
-    stopTimerTasks();
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-    resetHandlers();
-
-    /**
-     * clean up write-through cache
-     */
-    _baseDataAccessor.reset();
-
-    /**
-     * from here on, we are dealing with new session
-     */
-    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
-      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
-    }
-
-    /**
-     * auto-join
-     */
-    participantHelper.joinCluster();
-
-    /**
-     * Invoke PreConnectCallbacks
-     */
-    for (PreConnectCallback callback : _preConnectCallbacks) {
-      callback.onPreConnect();
-    }
-
-    participantHelper.createLiveInstance();
-
-    participantHelper.carryOverPreviousCurrentState();
-
-    participantHelper.setupMsgHandler();
-
-    /**
-     * leader election
-     */
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.init();
-    } else {
-      _leaderElectionHandler =
-          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
-              new DistributedLeaderElection(this, _controller), new EventType[] {
-                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-              }, ChangeType.CONTROLLER);
-    }
-
-    /**
-     * start health-check timer task
-     */
-    participantHelper.createHealthCheckPath();
-    startTimerTasks();
-
-    /**
-     * init handlers
-     * ok to init message handler, data-accessor, and controller handlers twice
-     * the second init will be skipped (see CallbackHandler)
-     */
-    initHandlers(_handlers);
-
-  }
-
-  @Override
-  void doDisconnect() {
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-  }
-
-  @Override
-  public boolean isLeader() {
-    if (!isConnected()) {
-      return false;
-    }
-
-    try {
-      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
-      if (leader != null) {
-        String leaderName = leader.getInstanceName();
-        String sessionId = leader.getSessionId();
-        if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
-            && sessionId.equals(_sessionId)) {
-          return true;
-        }
-      }
-    } catch (Exception e) {
-      // log
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index 0ab8342..6a6d296 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -20,10 +20,12 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.List;
 
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyType;
@@ -40,12 +42,15 @@ import org.apache.log4j.Logger;
 public class DistributedLeaderElection implements ControllerChangeListener {
   private static Logger LOG = Logger.getLogger(DistributedLeaderElection.class);
 
-  final AbstractManager _manager;
+  final HelixManager _manager;
   final GenericHelixController _controller;
+  final List<HelixTimerTask> _controllerTimerTasks;
 
-  public DistributedLeaderElection(AbstractManager manager, GenericHelixController controller) {
+  public DistributedLeaderElection(HelixManager manager, GenericHelixController controller,
+      List<HelixTimerTask> controllerTimerTasks) {
     _manager = manager;
     _controller = controller;
+    _controllerTimerTasks = controllerTimerTasks;
   }
 
   /**
@@ -68,7 +73,8 @@ public class DistributedLeaderElection implements ControllerChangeListener {
       return;
     }
 
-    ControllerManagerHelper controllerHelper = new ControllerManagerHelper(_manager);
+    ControllerManagerHelper controllerHelper =
+        new ControllerManagerHelper(_manager, _controllerTimerTasks);
     try {
       if (changeContext.getType().equals(NotificationContext.Type.INIT)
           || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
@@ -84,7 +90,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
                 + _manager.getClusterName());
 
             updateHistory(manager);
-            _manager._baseDataAccessor.reset();
+            _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
             controllerHelper.addListenersToController(_controller);
             controllerHelper.startControllerTimerTasks();
           }
@@ -98,7 +104,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
         /**
          * clear write-through cache
          */
-        _manager._baseDataAccessor.reset();
+        _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
       }
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
deleted file mode 100644
index 0af7e77..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ /dev/null
@@ -1,155 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ParticipantManager extends AbstractManager {
-
-  private static Logger LOG = Logger.getLogger(ParticipantManager.class);
-
-  /**
-   * state-transition message handler factory for helix-participant
-   */
-  final StateMachineEngine _stateMachineEngine;
-
-  final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
-  public ParticipantManager(String zkAddress, String clusterName, String instanceName) {
-    super(zkAddress, clusterName, instanceName, InstanceType.PARTICIPANT);
-
-    _stateMachineEngine = new HelixStateMachineEngine(this);
-    _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
-    _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
-  }
-
-  @Override
-  public ParticipantHealthReportCollector getHealthReportCollector() {
-    checkConnected();
-    return _participantHealthInfoCollector;
-  }
-
-  @Override
-  public StateMachineEngine getStateMachineEngine() {
-    return _stateMachineEngine;
-  }
-
-  @Override
-  public void handleNewSession() {
-    waitUntilConnected();
-
-    /**
-     * stop timer tasks, reset all handlers, make sure cleanup completed for previous session
-     * disconnect if cleanup fails
-     */
-    stopTimerTasks();
-    resetHandlers();
-
-    /**
-     * clear write-through cache
-     */
-    _baseDataAccessor.reset();
-
-    /**
-     * from here on, we are dealing with new session
-     */
-    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
-      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
-    }
-
-    /**
-     * auto-join
-     */
-    ParticipantManagerHelper participantHelper =
-        new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
-    participantHelper.joinCluster();
-
-    /**
-     * Invoke PreConnectCallbacks
-     */
-    for (PreConnectCallback callback : _preConnectCallbacks) {
-      callback.onPreConnect();
-    }
-
-    participantHelper.createLiveInstance();
-
-    participantHelper.carryOverPreviousCurrentState();
-
-    /**
-     * setup message listener
-     */
-    participantHelper.setupMsgHandler();
-
-    /**
-     * start health check timer task
-     */
-    participantHelper.createHealthCheckPath();
-    startTimerTasks();
-
-    /**
-     * init handlers
-     * ok to init message handler and data-accessor twice
-     * the second init will be skipped (see CallbackHandler)
-     */
-    initHandlers(_handlers);
-
-  }
-
-  /**
-   * helix-participant uses a write-through cache for current-state
-   */
-  @Override
-  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
-    String curStatePath =
-        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName);
-    return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(curStatePath));
-
-  }
-
-  @Override
-  public boolean isLeader() {
-    return false;
-  }
-
-  /**
-   * disconnect logic for helix-participant
-   */
-  @Override
-  void doDisconnect() {
-    // nothing for participant
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/579baa5b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index 70dd592..e7f9efb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.messaging.DefaultMessagingService;
@@ -55,7 +56,7 @@ public class ParticipantManagerHelper {
   private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
 
   final ZkClient _zkclient;
-  final AbstractManager _manager;
+  final HelixManager _manager;
   final PropertyKey.Builder _keyBuilder;
   final String _clusterName;
   final String _instanceName;
@@ -67,8 +68,10 @@ public class ParticipantManagerHelper {
   final ZKHelixDataAccessor _dataAccessor;
   final DefaultMessagingService _messagingService;
   final StateMachineEngine _stateMachineEngine;
+  final LiveInstanceInfoProvider _liveInstanceInfoProvider;
 
-  public ParticipantManagerHelper(AbstractManager manager, ZkClient zkclient, int sessionTimeout) {
+  public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout,
+      LiveInstanceInfoProvider liveInstanceInfoProvider) {
     _zkclient = zkclient;
     _manager = manager;
     _clusterName = manager.getClusterName();
@@ -82,6 +85,7 @@ public class ParticipantManagerHelper {
     _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
     _messagingService = (DefaultMessagingService) manager.getMessagingService();
     _stateMachineEngine = manager.getStateMachineEngine();
+    _liveInstanceInfoProvider = liveInstanceInfoProvider;
   }
 
   public void joinCluster() {
@@ -92,8 +96,8 @@ public class ParticipantManagerHelper {
           new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
               _manager.getClusterName()).build();
       autoJoin =
-          Boolean
-              .parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+          Boolean.parseBoolean(_configAccessor.get(scope,
+              ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
       LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
     } catch (Exception e) {
       // autoJoin is false
@@ -128,6 +132,19 @@ public class ParticipantManagerHelper {
     liveInstance.setHelixVersion(_manager.getVersion());
     liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
 
+    // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
+    if (_liveInstanceInfoProvider != null) {
+      LOG.info("invoke liveInstanceInfoProvider");
+      ZNRecord additionalLiveInstanceInfo =
+          _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
+      if (additionalLiveInstanceInfo != null) {
+        additionalLiveInstanceInfo.merge(liveInstance.getRecord());
+        ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
+        liveInstance = new LiveInstance(mergedLiveInstance);
+        LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
+      }
+    }
+
     boolean retry;
     do {
       retry = false;
@@ -250,7 +267,7 @@ public class ParticipantManagerHelper {
     }
   }
 
-  public void setupMsgHandler() {
+  public void setupMsgHandler() throws Exception {
     _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
         _stateMachineEngine);
     _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);


Mime
View raw message