helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [6/7] [HELIX-395] Remove old Helix alert/stat modules
Date Tue, 04 Mar 2014 19:04:53 GMT
http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java b/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
deleted file mode 100644
index 0e9c8f1..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/GreaterAlertComparator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class GreaterAlertComparator extends AlertComparator {
-
-  @Override
-  /*
-   * Returns true if any element left tuple exceeds any element in right tuple
-   */
-  public boolean evaluate(Tuple<String> leftTup, Tuple<String> rightTup) {
-    Iterator<String> leftIter = leftTup.iterator();
-    while (leftIter.hasNext()) {
-      double leftVal = Double.parseDouble(leftIter.next());
-      Iterator<String> rightIter = rightTup.iterator();
-      while (rightIter.hasNext()) {
-        double rightVal = Double.parseDouble(rightIter.next());
-        if (leftVal > rightVal) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
deleted file mode 100644
index 74a4688..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/MultiplyOperator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class MultiplyOperator extends Operator {
-
-  public MultiplyOperator() {
-    minInputTupleLists = 1;
-    maxInputTupleLists = Integer.MAX_VALUE;
-    inputOutputTupleListsCountsEqual = false;
-    numOutputTupleLists = 1;
-  }
-
-  public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
-    List out = new ArrayList();
-    out.add(input.iterator());
-    return out;
-  }
-
-  @Override
-  public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-    ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
-    if (input == null || input.size() == 0) {
-      return singleSetToIter(output);
-    }
-    while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
-                   // progress)
-      Tuple<String> rowProduct = null;
-      for (Iterator<Tuple<String>> it : input) {
-        if (!it.hasNext()) { // when any iterator runs out, we are done
-          return singleSetToIter(output);
-        }
-        rowProduct = multiplyTuples(rowProduct, it.next());
-      }
-      output.add(rowProduct);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java b/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
deleted file mode 100644
index 0612cf3..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Operator.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-import java.util.List;
-
-public abstract class Operator {
-
-  public int minInputTupleLists;
-  public int maxInputTupleLists;
-  public int numOutputTupleLists = -1;
-  public boolean inputOutputTupleListsCountsEqual = false;
-
-  public Operator() {
-
-  }
-
-  public Tuple<String> multiplyTuples(Tuple<String> tup1, Tuple<String> tup2) {
-    if (tup1 == null) {
-      return tup2;
-    }
-    if (tup2 == null) {
-      return tup1;
-    }
-    Tuple<String> outputTup = new Tuple<String>();
-
-    // sum staggers if the tuples are same length
-    // e.g. 1,2,3 + 4,5 = 1,6,8
-    // so this is a bit tricky
-    Tuple<String> largerTup;
-    Tuple<String> smallerTup;
-    if (tup1.size() >= tup2.size()) {
-      largerTup = tup1;
-      smallerTup = tup2;
-    } else {
-      largerTup = tup2;
-      smallerTup = tup1;
-    }
-    int gap = largerTup.size() - smallerTup.size();
-
-    for (int i = 0; i < largerTup.size(); i++) {
-      if (i < gap) {
-        outputTup.add(largerTup.getElement(i));
-      } else {
-        double elementProduct = 0;
-        elementProduct =
-            Double.parseDouble(largerTup.getElement(i))
-                * Double.parseDouble(smallerTup.getElement(i - gap));
-        outputTup.add(String.valueOf(elementProduct));
-      }
-    }
-    return outputTup;
-  }
-
-  public Tuple<String> sumTuples(Tuple<String> tup1, Tuple<String> tup2) {
-    if (tup1 == null) {
-      return tup2;
-    }
-    if (tup2 == null) {
-      return tup1;
-    }
-    Tuple<String> outputTup = new Tuple<String>();
-
-    // sum staggers if the tuples are same length
-    // e.g. 1,2,3 + 4,5 = 1,6,8
-    // so this is a bit tricky
-    Tuple<String> largerTup;
-    Tuple<String> smallerTup;
-    if (tup1.size() >= tup2.size()) {
-      largerTup = tup1;
-      smallerTup = tup2;
-    } else {
-      largerTup = tup2;
-      smallerTup = tup1;
-    }
-    int gap = largerTup.size() - smallerTup.size();
-
-    for (int i = 0; i < largerTup.size(); i++) {
-      if (i < gap) {
-        outputTup.add(largerTup.getElement(i));
-      } else {
-        double elementSum = 0;
-        elementSum =
-            Double.parseDouble(largerTup.getElement(i))
-                + Double.parseDouble(smallerTup.getElement(i - gap));
-        outputTup.add(String.valueOf(elementSum));
-      }
-    }
-    return outputTup;
-  }
-
-  public abstract List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input);
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java b/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
deleted file mode 100644
index 6895128..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Stat.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class Stat {
-  String _name;
-  Tuple<String> _value;
-  Tuple<String> _timestamp;
-
-  public Stat(String name, Tuple<String> value, Tuple<String> timestamp) {
-    _name = name;
-    _value = value;
-    _timestamp = timestamp;
-  }
-
-  public String getName() {
-    return _name;
-  }
-
-  public Tuple<String> getValue() {
-    return _value;
-  }
-
-  public Tuple<String> getTimestamp() {
-    return _timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java b/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
deleted file mode 100644
index 1538eb8..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/StatsHolder.java
+++ /dev/null
@@ -1,306 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.controller.stages.HealthDataCache;
-import org.apache.helix.model.PersistentStats;
-import org.apache.log4j.Logger;
-
-public class StatsHolder {
-  enum MatchResult {
-    WILDCARDMATCH,
-    EXACTMATCH,
-    NOMATCH
-  };
-
-  private static final Logger logger = Logger.getLogger(StatsHolder.class.getName());
-
-  public static final String VALUE_NAME = "value";
-  public static final String TIMESTAMP_NAME = "TimeStamp";
-
-  HelixDataAccessor _accessor;
-  HealthDataCache _cache;
-
-  Map<String, Map<String, String>> _statMap;
-  Map<String, Map<String, MatchResult>> _statAlertMatchResult;
-
-  private Builder _keyBuilder;
-
-  // PersistentStats _persistentStats;
-
-  public StatsHolder(HelixManager manager, HealthDataCache cache) {
-    _accessor = manager.getHelixDataAccessor();
-    _cache = cache;
-    _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
-    updateCache(_cache);
-    _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
-
-  }
-
-  public void refreshStats() {
-    logger.info("Refreshing cached stats");
-    _cache.refresh(_accessor);
-    updateCache(_cache);
-  }
-
-  public void persistStats() {
-    // XXX: Am I using _accessor too directly here?
-    // took around 35 ms from desktop to ESV4 machine
-    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-    if (stats == null) {
-      stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
-      // this record, if it
-      // matters
-    }
-    stats.getRecord().setMapFields(_statMap);
-    boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(), stats);
-  }
-
-  public void getStatsFromCache(boolean refresh) {
-    long refreshStartTime = System.currentTimeMillis();
-    if (refresh) {
-      _cache.refresh(_accessor);
-    }
-    PersistentStats persistentStatRecord = _cache.getPersistentStats();
-    if (persistentStatRecord != null) {
-      _statMap = persistentStatRecord.getMapFields();
-    } else {
-      _statMap = new HashMap<String, Map<String, String>>();
-    }
-    /*
-     * if (_cache.getPersistentStats() != null) {
-     * _statMap = _cache.getPersistentStats();
-     * }
-     */
-    // TODO: confirm this a good place to init the _statMap when null
-    /*
-     * if (_statMap == null) {
-     * _statMap = new HashMap<String, Map<String, String>>();
-     * }
-     */
-    System.out.println("Refresh stats done: " + (System.currentTimeMillis() - refreshStartTime));
-  }
-
-  public Iterator<String> getAllStats() {
-    return null;
-  }
-
-  /*
-   * TODO: figure out pre-conditions here. I think not allowing anything to be
-   * null on input
-   */
-  public Map<String, String> mergeStats(String statName, Map<String, String> existingStat,
-      Map<String, String> incomingStat) throws HelixException {
-    if (existingStat == null) {
-      throw new HelixException("existing stat for merge is null");
-    }
-    if (incomingStat == null) {
-      throw new HelixException("incoming stat for merge is null");
-    }
-    // get agg type and arguments, then get agg object
-    String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
-    String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
-    Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
-    // XXX: some of below lines might fail with null exceptions
-
-    // get timestamps, values out of zk maps
-    String existingTime = existingStat.get(TIMESTAMP_NAME);
-    String existingVal = existingStat.get(VALUE_NAME);
-    String incomingTime = incomingStat.get(TIMESTAMP_NAME);
-    String incomingVal = incomingStat.get(VALUE_NAME);
-    // parse values into tuples, if the values exist. else, tuples are null
-    Tuple<String> existingTimeTuple =
-        (existingTime != null) ? Tuple.fromString(existingTime) : null;
-    Tuple<String> existingValueTuple = (existingVal != null) ? Tuple.fromString(existingVal) : null;
-    Tuple<String> incomingTimeTuple =
-        (incomingTime != null) ? Tuple.fromString(incomingTime) : null;
-    Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple.fromString(incomingVal) : null;
-
-    // dp merge
-    agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple, incomingTimeTuple, aggArgs);
-    // put merged tuples back in map
-    Map<String, String> mergedMap = new HashMap<String, String>();
-    if (existingTimeTuple.size() == 0) {
-      throw new HelixException("merged time tuple has size zero");
-    }
-    if (existingValueTuple.size() == 0) {
-      throw new HelixException("merged value tuple has size zero");
-    }
-
-    mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
-    mergedMap.put(VALUE_NAME, existingValueTuple.toString());
-    return mergedMap;
-  }
-
-  /*
-   * Find all persisted stats this stat matches. Update those stats. An incoming
-   * stat can match multiple stats exactly (if that stat has multiple agg types)
-   * An incoming stat can match multiple wildcard stats
-   */
-
-  // need to do a time check here!
-
-  public void applyStat(String incomingStatName, Map<String, String> statFields) {
-    // TODO: consider locking stats here
-    // refreshStats(); //will have refreshed by now during stage
-
-    Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
-
-    if (!_statAlertMatchResult.containsKey(incomingStatName)) {
-      _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
-    }
-    Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
-    // traverse through all persistent stats
-    for (String key : _statMap.keySet()) {
-      if (resultMap.containsKey(key)) {
-        MatchResult cachedMatchResult = resultMap.get(key);
-        if (cachedMatchResult == MatchResult.EXACTMATCH) {
-          processExactMatch(key, statFields);
-        } else if (cachedMatchResult == MatchResult.WILDCARDMATCH) {
-          processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
-        }
-        // don't care about NOMATCH
-        continue;
-      }
-      // exact match on stat and stat portion of persisted stat, just update
-      if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName)) {
-        processExactMatch(key, statFields);
-        resultMap.put(key, MatchResult.EXACTMATCH);
-      }
-      // wildcard match
-      else if (ExpressionParser.isIncomingStatWildcardMatch(key, incomingStatName)) {
-        processWildcardMatch(incomingStatName, key, statFields, pendingAdds);
-        resultMap.put(key, MatchResult.WILDCARDMATCH);
-      } else {
-        resultMap.put(key, MatchResult.NOMATCH);
-      }
-    }
-    _statMap.putAll(pendingAdds);
-  }
-
-  void processExactMatch(String key, Map<String, String> statFields) {
-    Map<String, String> mergedStat = mergeStats(key, _statMap.get(key), statFields);
-    // update in place, no problem with hash map
-    _statMap.put(key, mergedStat);
-  }
-
-  void processWildcardMatch(String incomingStatName, String key, Map<String, String> statFields,
-      Map<String, Map<String, String>> pendingAdds) {
-
-    // make sure incoming stat doesn't already exist, either in previous
-    // round or this round
-    // form new key (incomingStatName with agg type from the wildcarded
-    // stat)
-    String statToAdd = ExpressionParser.getWildcardStatSubstitution(key, incomingStatName);
-    // if the stat already existed in _statMap, we have/will apply it as an
-    // exact match
-    // if the stat was added this round to pendingAdds, no need to recreate
-    // (it would have same value)
-    if (!_statMap.containsKey(statToAdd) && !pendingAdds.containsKey(statToAdd)) {
-      // add this stat to persisted stats
-      Map<String, String> mergedStat = mergeStats(statToAdd, getEmptyStat(), statFields);
-      // add to pendingAdds so we don't mess up ongoing traversal of
-      // _statMap
-      pendingAdds.put(statToAdd, mergedStat);
-    }
-  }
-
-  // add parsing of stat (or is that in expression holder?) at least add
-  // validate
-  public void addStat(String exp) throws HelixException {
-    refreshStats(); // get current stats
-
-    String[] parsedStats = ExpressionParser.getBaseStats(exp);
-
-    for (String stat : parsedStats) {
-      if (_statMap.containsKey(stat)) {
-        logger.debug("Stat " + stat + " already exists; not adding");
-        continue;
-      }
-      _statMap.put(stat, getEmptyStat()); // add new stat to map
-    }
-  }
-
-  public static Map<String, Map<String, String>> parseStat(String exp) throws HelixException {
-    String[] parsedStats = ExpressionParser.getBaseStats(exp);
-    Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
-
-    for (String stat : parsedStats) {
-      if (statMap.containsKey(stat)) {
-        logger.debug("Stat " + stat + " already exists; not adding");
-        continue;
-      }
-      statMap.put(stat, getEmptyStat()); // add new stat to map
-    }
-    return statMap;
-  }
-
-  public static Map<String, String> getEmptyStat() {
-    Map<String, String> statFields = new HashMap<String, String>();
-    statFields.put(TIMESTAMP_NAME, "");
-    statFields.put(VALUE_NAME, "");
-    return statFields;
-  }
-
-  public List<Stat> getStatsList() {
-    List<Stat> stats = new LinkedList<Stat>();
-    for (String stat : _statMap.keySet()) {
-      Map<String, String> statFields = _statMap.get(stat);
-      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
-      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
-      Stat s = new Stat(stat, valTup, timeTup);
-      stats.add(s);
-    }
-    return stats;
-  }
-
-  public Map<String, Tuple<String>> getStatsMap() {
-    // refreshStats(); //don't refresh, stage will have refreshed by this time
-    HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
-    for (String stat : _statMap.keySet()) {
-      Map<String, String> statFields = _statMap.get(stat);
-      Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
-      Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
-      stats.put(stat, valTup);
-    }
-    return stats;
-  }
-
-  public void updateCache(HealthDataCache cache) {
-    _cache = cache;
-    PersistentStats persistentStatRecord = _cache.getPersistentStats();
-    if (persistentStatRecord != null) {
-      _statMap = persistentStatRecord.getMapFields();
-    } else {
-      _statMap = new HashMap<String, Map<String, String>>();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
deleted file mode 100644
index 2cc733f..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumEachOperator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumEachOperator extends Operator {
-
-  public SumEachOperator() {
-    minInputTupleLists = 1;
-    maxInputTupleLists = Integer.MAX_VALUE;
-    inputOutputTupleListsCountsEqual = true;
-    numOutputTupleLists = -1;
-  }
-
-  // for each column, generate sum
-  @Override
-  public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-    List<Iterator<Tuple<String>>> out = new ArrayList<Iterator<Tuple<String>>>();
-    for (Iterator<Tuple<String>> currIt : input) {
-      Tuple<String> currSum = null;
-      while (currIt.hasNext()) {
-        currSum = sumTuples(currSum, currIt.next());
-      }
-      ArrayList<Tuple<String>> currOutList = new ArrayList<Tuple<String>>();
-      currOutList.add(currSum);
-      out.add(currOutList.iterator());
-    }
-    return out;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java b/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
deleted file mode 100644
index 90c9ab0..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/SumOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SumOperator extends Operator {
-
-  public SumOperator() {
-    minInputTupleLists = 1;
-    maxInputTupleLists = Integer.MAX_VALUE;
-    inputOutputTupleListsCountsEqual = false;
-    numOutputTupleLists = 1;
-  }
-
-  public List<Iterator<Tuple<String>>> singleSetToIter(ArrayList<Tuple<String>> input) {
-    List out = new ArrayList();
-    out.add(input.iterator());
-    return out;
-  }
-
-  @Override
-  public List<Iterator<Tuple<String>>> execute(List<Iterator<Tuple<String>>> input) {
-    ArrayList<Tuple<String>> output = new ArrayList<Tuple<String>>();
-    if (input == null || input.size() == 0) {
-      return singleSetToIter(output);
-    }
-    while (true) { // loop through set of iters, return when 1 runs out (not completing the row in
-                   // progress)
-      Tuple<String> rowSum = null;
-      for (Iterator<Tuple<String>> it : input) {
-        if (!it.hasNext()) { // when any iterator runs out, we are done
-          return singleSetToIter(output);
-        }
-        rowSum = sumTuples(rowSum, it.next());
-      }
-      output.add(rowSum);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java b/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
deleted file mode 100644
index e57f088..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/Tuple.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class Tuple<T> {
-  List<T> elements;
-
-  public Tuple() {
-    elements = new ArrayList<T>();
-  }
-
-  public int size() {
-    return elements.size();
-  }
-
-  public void add(T entry) {
-    elements.add(entry);
-  }
-
-  public void addAll(Tuple<T> incoming) {
-    elements.addAll(incoming.getElements());
-  }
-
-  public Iterator<T> iterator() {
-    return elements.listIterator();
-  }
-
-  public T getElement(int ind) {
-    return elements.get(ind);
-  }
-
-  public List<T> getElements() {
-    return elements;
-  }
-
-  public void clear() {
-    elements.clear();
-  }
-
-  public static Tuple<String> fromString(String in) {
-    Tuple<String> tup = new Tuple<String>();
-    if (in.length() > 0) {
-      String[] elements = in.split(",");
-      for (String element : elements) {
-        tup.add(element);
-      }
-    }
-    return tup;
-  }
-
-  public String toString() {
-    StringBuilder out = new StringBuilder();
-    Iterator<T> it = iterator();
-    boolean outEmpty = true;
-    while (it.hasNext()) {
-      if (!outEmpty) {
-        out.append(",");
-      }
-      out.append(it.next());
-      outEmpty = false;
-    }
-    return out.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java b/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
deleted file mode 100644
index 6ef4cfe..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/WindowAggregator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.helix.alerts;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Iterator;
-
-public class WindowAggregator extends Aggregator {
-
-  int _windowSize;
-
-  public WindowAggregator(String windowSize) {
-    _windowSize = Integer.parseInt(windowSize);
-    _numArgs = 1;
-  }
-
-  public WindowAggregator() {
-    this("1");
-  }
-
-  @Override
-  public void merge(Tuple<String> currValTup, Tuple<String> newValTup, Tuple<String> currTimeTup,
-      Tuple<String> newTimeTup, String... args) {
-
-    _windowSize = Integer.parseInt(args[0]);
-
-    // figure out how many curr tuple values we displace
-    Tuple<String> mergedTimeTuple = new Tuple<String>();
-    Tuple<String> mergedValTuple = new Tuple<String>();
-
-    Iterator<String> currTimeIter = currTimeTup.iterator();
-    Iterator<String> currValIter = currValTup.iterator();
-    Iterator<String> newTimeIter = newTimeTup.iterator();
-    Iterator<String> newValIter = newValTup.iterator();
-    int currCtr = 0;
-    // traverse current vals
-    double currTime = -1;
-    double currVal;
-    while (currTimeIter.hasNext()) {
-      currTime = Double.parseDouble(currTimeIter.next());
-      currVal = Double.parseDouble(currValIter.next());
-      currCtr++;
-      // number of evicted currVals equal to total size of both minus _windowSize
-      if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted
-                                                                              // element, just bump
-                                                                              // down
-        mergedTimeTuple.add(String.valueOf(currTime));
-        mergedValTuple.add(String.valueOf(currVal));
-      }
-    }
-
-    double newVal;
-    double newTime;
-    while (newTimeIter.hasNext()) {
-      newVal = Double.parseDouble(newValIter.next());
-      newTime = Double.parseDouble(newTimeIter.next());
-      if (newTime <= currTime) { // oldest new time older than newest curr time. we will not apply
-                                 // new tuple!
-        return; // curr tuples remain the same
-      }
-      currCtr++;
-      if (currCtr > (newTimeTup.size() + currTimeTup.size() - _windowSize)) { // non-evicted element
-        mergedTimeTuple.add(String.valueOf(newTime));
-        mergedValTuple.add(String.valueOf(newVal));
-      }
-    }
-    // set curr tuples to merged tuples
-    currTimeTup.clear();
-    currTimeTup.addAll(mergedTimeTuple);
-    currValTup.clear();
-    currValTup.addAll(mergedValTuple);
-    // TODO: see if we can do merger in place on curr
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java b/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
deleted file mode 100644
index bf1d9a6..0000000
--- a/helix-core/src/main/java/org/apache/helix/alerts/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Classes for Helix alerts
- */
-package org.apache.helix.alerts;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index fdeb879..856c09b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -32,10 +32,8 @@ import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SpectatorId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 
@@ -95,8 +93,8 @@ public class Cluster {
   public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
       Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
       ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
-      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean autoJoinAllowed) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+      boolean isPaused, boolean autoJoinAllowed) {
 
     // build the config
     // Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -118,8 +116,7 @@ public class Cluster {
         new ClusterConfig.Builder(id).addResources(resourceConfigMap.values())
             .addParticipants(participantConfigMap.values()).addConstraints(constraintMap.values())
             .addStateModelDefinitions(stateModelMap.values()).pausedStatus(isPaused)
-            .userConfig(userConfig).autoJoin(autoJoinAllowed).addStats(stats).addAlerts(alerts)
-            .build();
+            .userConfig(userConfig).autoJoin(autoJoinAllowed).build();
 
     _resourceMap = ImmutableMap.copyOf(resourceMap);
 
@@ -224,22 +221,6 @@ public class Cluster {
   }
 
   /**
-   * Get all the persisted stats for the cluster
-   * @return PersistentStats instance
-   */
-  public PersistentStats getStats() {
-    return _config.getStats();
-  }
-
-  /**
-   * Get all the persisted alerts for the cluster
-   * @return Alerts instance
-   */
-  public Alerts getAlerts() {
-    return _config.getAlerts();
-  }
-
-  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 1195ce2..f08d857 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -25,15 +25,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Controller;
 import org.apache.helix.api.Participant;
@@ -53,7 +48,6 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.context.RebalancerContext;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -65,7 +59,6 @@ import org.apache.helix.model.Leader;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
@@ -125,9 +118,6 @@ public class ClusterAccessor {
     if (cluster.autoJoinAllowed()) {
       clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
     }
-    if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
-      _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
-    }
     if (cluster.isPaused()) {
       pauseCluster();
     }
@@ -169,16 +159,6 @@ public class ClusterAccessor {
       ClusterConstraints constraint = constraints.get(type);
       _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraint);
     }
-    if (config.getStats() == null || config.getStats().getMapFields().isEmpty()) {
-      _accessor.removeProperty(_keyBuilder.persistantStat());
-    } else {
-      _accessor.setProperty(_keyBuilder.persistantStat(), config.getStats());
-    }
-    if (config.getAlerts() == null || config.getAlerts().getMapFields().isEmpty()) {
-      _accessor.removeProperty(_keyBuilder.alerts());
-    } else {
-      _accessor.setProperty(_keyBuilder.alerts(), config.getAlerts());
-    }
     return true;
   }
 
@@ -261,15 +241,9 @@ public class ClusterAccessor {
     // read the state model definitions
     Map<StateModelDefId, StateModelDefinition> stateModelMap = readStateModelDefinitions();
 
-    // read the stats
-    PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
-
-    // read the alerts
-    Alerts alerts = _accessor.getProperty(_keyBuilder.alerts());
-
     // create the cluster snapshot object
     return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
-        clusterConstraintMap, stateModelMap, stats, alerts, userConfig, isPaused, autoJoinAllowed);
+        clusterConstraintMap, stateModelMap, userConfig, isPaused, autoJoinAllowed);
   }
 
   /**
@@ -452,144 +426,6 @@ public class ClusterAccessor {
   }
 
   /**
-   * Get the stats persisted on this cluster
-   * @return PersistentStats, or null if none persisted
-   */
-  public PersistentStats readStats() {
-    return _accessor.getProperty(_keyBuilder.persistantStat());
-  }
-
-  /**
-   * Add a statistic specification to the cluster. Existing stat specifications will not be
-   * overwritten
-   * @param statName string representing a stat specification
-   * @return true if the stat spec was added, false otherwise
-   */
-  public boolean addStat(final String statName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    String persistentStatsPath = _keyBuilder.persistantStat().getPath();
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord statsRec) {
-        if (statsRec == null) {
-          statsRec = new ZNRecord(PersistentStats.nodeName);
-        }
-        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
-        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-        for (String newStat : newStatMap.keySet()) {
-          if (!currStatMap.containsKey(newStat)) {
-            currStatMap.put(newStat, newStatMap.get(newStat));
-          }
-        }
-        statsRec.setMapFields(currStatMap);
-        return statsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Remove a statistic specification from the cluster
-   * @param statName string representing a statistic specification
-   * @return true if stats removed, false otherwise
-   */
-  public boolean dropStat(final String statName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    String persistentStatsPath = _keyBuilder.persistantStat().getPath();
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    return baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord statsRec) {
-        if (statsRec == null) {
-          throw new HelixException("No stats record in ZK, nothing to drop");
-        }
-        Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
-        Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
-        // delete each stat from stat map
-        for (String newStat : newStatMap.keySet()) {
-          if (currStatMap.containsKey(newStat)) {
-            currStatMap.remove(newStat);
-          }
-        }
-        statsRec.setMapFields(currStatMap);
-        return statsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Add an alert specification to the cluster
-   * @param alertName string representing the alert spec
-   * @return true if added, false otherwise
-   */
-  public boolean addAlert(final String alertName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    String alertsPath = _keyBuilder.alerts().getPath();
-    return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord alertsRec) {
-        if (alertsRec == null) {
-          alertsRec = new ZNRecord(Alerts.nodeName);
-        }
-        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
-        StringBuilder newStatName = new StringBuilder();
-        Map<String, String> newAlertMap = new HashMap<String, String>();
-
-        // use AlertsHolder to get map of new stats and map for this alert
-        AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
-
-        // add stat
-        addStat(newStatName.toString());
-
-        // add alert
-        currAlertMap.put(alertName, newAlertMap);
-        alertsRec.setMapFields(currAlertMap);
-        return alertsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
-   * Remove an alert specification from the cluster
-   * @param alertName string representing an alert specification
-   * @return true if removed, false otherwise
-   */
-  public boolean dropAlert(final String alertName) {
-    if (!isClusterStructureValid()) {
-      LOG.error("cluster " + _clusterId + " is not setup yet");
-      return false;
-    }
-
-    String alertsPath = _keyBuilder.alerts().getPath();
-    BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
-    return baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord alertsRec) {
-        if (alertsRec == null) {
-          throw new HelixException("No alerts record persisted, nothing to drop");
-        }
-        Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
-        currAlertMap.remove(alertName);
-        alertsRec.setMapFields(currAlertMap);
-        return alertsRec;
-      }
-    }, AccessOption.PERSISTENT);
-  }
-
-  /**
    * Add user configuration to the existing cluster user configuration. Overwrites properties with
    * the same key
    * @param userConfig the user config key-value pairs to add

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index 22a1528..4672280 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -5,8 +5,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.StatsHolder;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.State;
 import org.apache.helix.api.id.ClusterId;
@@ -14,14 +12,12 @@ import org.apache.helix.api.id.ConstraintId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Transition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
@@ -61,8 +57,6 @@ public class ClusterConfig {
   private final Map<ParticipantId, ParticipantConfig> _participantMap;
   private final Map<ConstraintType, ClusterConstraints> _constraintMap;
   private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
-  private final PersistentStats _stats;
-  private final Alerts _alerts;
   private final UserConfig _userConfig;
   private final boolean _isPaused;
   private final boolean _autoJoin;
@@ -83,15 +77,13 @@ public class ClusterConfig {
   private ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
       Map<ParticipantId, ParticipantConfig> participantMap,
       Map<ConstraintType, ClusterConstraints> constraintMap,
-      Map<StateModelDefId, StateModelDefinition> stateModelMap, PersistentStats stats,
-      Alerts alerts, UserConfig userConfig, boolean isPaused, boolean allowAutoJoin) {
+      Map<StateModelDefId, StateModelDefinition> stateModelMap, UserConfig userConfig,
+      boolean isPaused, boolean allowAutoJoin) {
     _id = id;
     _resourceMap = ImmutableMap.copyOf(resourceMap);
     _participantMap = ImmutableMap.copyOf(participantMap);
     _constraintMap = ImmutableMap.copyOf(constraintMap);
     _stateModelMap = ImmutableMap.copyOf(stateModelMap);
-    _stats = stats;
-    _alerts = alerts;
     _userConfig = userConfig;
     _isPaused = isPaused;
     _autoJoin = allowAutoJoin;
@@ -237,22 +229,6 @@ public class ClusterConfig {
   }
 
   /**
-   * Get all the statistics persisted on the cluster
-   * @return PersistentStats instance
-   */
-  public PersistentStats getStats() {
-    return _stats;
-  }
-
-  /**
-   * Get all the alerts persisted on the cluster
-   * @return Alerts instance
-   */
-  public Alerts getAlerts() {
-    return _alerts;
-  }
-
-  /**
    * Get user-specified configuration properties of this cluster
    * @return UserConfig properties
    */
@@ -287,8 +263,6 @@ public class ClusterConfig {
 
     private Set<Fields> _updateFields;
     private Map<ConstraintType, Set<ConstraintId>> _removedConstraints;
-    private PersistentStats _removedStats;
-    private Alerts _removedAlerts;
     private Builder _builder;
 
     /**
@@ -302,8 +276,6 @@ public class ClusterConfig {
         Set<ConstraintId> constraints = Sets.newHashSet();
         _removedConstraints.put(type, constraints);
       }
-      _removedStats = new PersistentStats(PersistentStats.nodeName);
-      _removedAlerts = new Alerts(Alerts.nodeName);
       _builder = new Builder(clusterId);
     }
 
@@ -431,57 +403,6 @@ public class ClusterConfig {
     }
 
     /**
-     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
-     * @param stat string specifying the stat specification
-     * @return Delta
-     */
-    public Delta addStat(String stat) {
-      _builder.addStat(stat);
-      return this;
-    }
-
-    /**
-     * Add an alert specification for the cluster. Existing specifications will not be overwritten
-     * @param alert string specifying the alert specification
-     * @return Delta
-     */
-    public Delta addAlert(String alert) {
-      _builder.addAlert(alert);
-      return this;
-    }
-
-    /**
-     * Remove a statistic specification from the cluster
-     * @param stat statistic specification
-     * @return Delta
-     */
-    public Delta removeStat(String stat) {
-      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
-      Map<String, Map<String, String>> currentStats = _removedStats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        currentStats.put(statName, parsedStat.get(statName));
-      }
-      return this;
-    }
-
-    /**
-     * Remove an alert specification for the cluster
-     * @param alert alert specification
-     * @return Delta
-     */
-    public Delta removeAlert(String alert) {
-      Map<String, Map<String, String>> currAlertMap = _removedAlerts.getMapFields();
-      if (!currAlertMap.containsKey(alert)) {
-        Map<String, String> parsedAlert = Maps.newHashMap();
-        StringBuilder statsName = new StringBuilder();
-        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
-        removeStat(statsName.toString());
-        currAlertMap.put(alert, parsedAlert);
-      }
-      return this;
-    }
-
-    /**
      * Create a ClusterConfig that is the combination of an existing ClusterConfig and this delta
      * @param orig the original ClusterConfig
      * @return updated ClusterConfig
@@ -494,8 +415,7 @@ public class ClusterConfig {
               .addParticipants(orig.getParticipantMap().values())
               .addStateModelDefinitions(orig.getStateModelMap().values())
               .userConfig(orig.getUserConfig()).pausedStatus(orig.isPaused())
-              .autoJoin(orig.autoJoinAllowed()).addStats(orig.getStats())
-              .addAlerts(orig.getAlerts());
+              .autoJoin(orig.autoJoinAllowed());
       for (Fields field : _updateFields) {
         switch (field) {
         case USER_CONFIG:
@@ -529,28 +449,9 @@ public class ClusterConfig {
         builder.addConstraint(constraints);
       }
 
-      // add stats and alerts
-      builder.addStats(deltaConfig.getStats());
-      builder.addAlerts(deltaConfig.getAlerts());
-
       // get the result
       ClusterConfig result = builder.build();
 
-      // remove stats
-      PersistentStats stats = result.getStats();
-      for (String removedStat : _removedStats.getMapFields().keySet()) {
-        if (stats.getMapFields().containsKey(removedStat)) {
-          stats.getMapFields().remove(removedStat);
-        }
-      }
-
-      // remove alerts
-      Alerts alerts = result.getAlerts();
-      for (String removedAlert : _removedAlerts.getMapFields().keySet()) {
-        if (alerts.getMapFields().containsKey(removedAlert)) {
-          alerts.getMapFields().remove(removedAlert);
-        }
-      }
       return result;
     }
   }
@@ -565,8 +466,6 @@ public class ClusterConfig {
     private final Map<ConstraintType, ClusterConstraints> _constraintMap;
     private final Map<StateModelDefId, StateModelDefinition> _stateModelMap;
     private UserConfig _userConfig;
-    private PersistentStats _stats;
-    private Alerts _alerts;
     private boolean _isPaused;
     private boolean _autoJoin;
 
@@ -583,8 +482,6 @@ public class ClusterConfig {
       _isPaused = false;
       _autoJoin = false;
       _userConfig = new UserConfig(Scope.cluster(id));
-      _stats = new PersistentStats(PersistentStats.nodeName);
-      _alerts = new Alerts(Alerts.nodeName);
     }
 
     /**
@@ -789,74 +686,6 @@ public class ClusterConfig {
     }
 
     /**
-     * Add a statistic specification to the cluster. Existing specifications will not be overwritten
-     * @param stat String specifying the stat specification
-     * @return Builder
-     */
-    public Builder addStat(String stat) {
-      Map<String, Map<String, String>> parsedStat = StatsHolder.parseStat(stat);
-      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        if (!currentStats.containsKey(statName)) {
-          currentStats.put(statName, parsedStat.get(statName));
-        }
-      }
-      return this;
-    }
-
-    /**
-     * Add statistic specifications to the cluster. Existing specifications will not be overwritten
-     * @param stats PersistentStats specifying the stat specification
-     * @return Builder
-     */
-    public Builder addStats(PersistentStats stats) {
-      if (stats == null) {
-        return this;
-      }
-      Map<String, Map<String, String>> parsedStat = stats.getMapFields();
-      Map<String, Map<String, String>> currentStats = _stats.getMapFields();
-      for (String statName : parsedStat.keySet()) {
-        if (!currentStats.containsKey(statName)) {
-          currentStats.put(statName, parsedStat.get(statName));
-        }
-      }
-      return this;
-    }
-
-    /**
-     * Add alert specifications to the cluster. Existing specifications will not be overwritten
-     * @param alert string representing alert specifications
-     * @return Builder
-     */
-    public Builder addAlert(String alert) {
-      Map<String, Map<String, String>> currAlertMap = _alerts.getMapFields();
-      if (!currAlertMap.containsKey(alert)) {
-        Map<String, String> parsedAlert = Maps.newHashMap();
-        StringBuilder statsName = new StringBuilder();
-        AlertsHolder.parseAlert(alert, statsName, parsedAlert);
-        addStat(statsName.toString());
-        currAlertMap.put(alert, parsedAlert);
-      }
-      return this;
-    }
-
-    /**
-     * Add alert specifications to the cluster. Existing specifications will not be overwritten
-     * @param alerts Alerts instance
-     * @return Builder
-     */
-    public Builder addAlerts(Alerts alerts) {
-      if (alerts == null) {
-        return this;
-      }
-      Map<String, Map<String, String>> alertMap = alerts.getMapFields();
-      for (String alert : alertMap.keySet()) {
-        addAlert(alert);
-      }
-      return this;
-    }
-
-    /**
      * Set the paused status of the cluster
      * @param isPaused true if paused, false otherwise
      * @return Builder
@@ -892,7 +721,7 @@ public class ClusterConfig {
      */
     public ClusterConfig build() {
       return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _stateModelMap,
-          _stats, _alerts, _userConfig, _isPaused, _autoJoin);
+          _userConfig, _isPaused, _autoJoin);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ab51670..b9527e6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -32,7 +32,6 @@ import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.CurrentStateChangeListener;
 import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.IdealStateChangeListener;
@@ -58,7 +57,6 @@ import org.apache.helix.controller.stages.TaskAssignmentStage;
 import org.apache.helix.controller.stages.PersistAssignmentStage;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.HealthStat;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Leader;
@@ -84,7 +82,7 @@ import org.apache.log4j.Logger;
  */
 public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener,
     LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ExternalViewChangeListener, ControllerChangeListener, HealthStateChangeListener {
+    ExternalViewChangeListener, ControllerChangeListener {
   private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
   volatile boolean init = false;
   private final PipelineRegistry _registry;
@@ -311,16 +309,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
   }
 
   @Override
-  public void onHealthChange(String instanceName, List<HealthStat> reports,
-      NotificationContext changeContext) {
-    /**
-     * When there are more participant ( > 20, can be in hundreds), This callback can be
-     * called quite frequently as each participant reports health stat every minute. Thus
-     * we change the health check pipeline to run in a timer callback.
-     */
-  }
-
-  @Override
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onMessage()");

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
deleted file mode 100644
index 6b29e2d..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/HealthDataCache.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.model.AlertStatus;
-import org.apache.helix.model.Alerts;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.PersistentStats;
-
-public class HealthDataCache {
-  Map<String, LiveInstance> _liveInstanceMap;
-
-  Map<String, Map<String, HealthStat>> _healthStatMap;
-  HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
-  PersistentStats _persistentStats;
-  Alerts _alerts;
-  AlertStatus _alertStatus;
-
-  public HealthStat getGlobalStats() {
-    return _globalStats;
-  }
-
-  public PersistentStats getPersistentStats() {
-    return _persistentStats;
-  }
-
-  public Alerts getAlerts() {
-    return _alerts;
-  }
-
-  public AlertStatus getAlertStatus() {
-    return _alertStatus;
-  }
-
-  public Map<String, HealthStat> getHealthStats(String instanceName) {
-    Map<String, HealthStat> map = _healthStatMap.get(instanceName);
-    if (map != null) {
-      return map;
-    } else {
-      return Collections.emptyMap();
-    }
-  }
-
-  public Map<String, LiveInstance> getLiveInstances() {
-    return _liveInstanceMap;
-  }
-
-  public boolean refresh(HelixDataAccessor accessor) {
-    Builder keyBuilder = accessor.keyBuilder();
-    _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-
-    Map<String, Map<String, HealthStat>> hsMap = new HashMap<String, Map<String, HealthStat>>();
-
-    for (String instanceName : _liveInstanceMap.keySet()) {
-      // xxx clearly getting znodes for the instance here...so get the
-      // timestamp!
-
-      Map<String, HealthStat> childValuesMap =
-          accessor.getChildValuesMap(keyBuilder.healthReports(instanceName));
-      hsMap.put(instanceName, childValuesMap);
-    }
-    _healthStatMap = Collections.unmodifiableMap(hsMap);
-    _persistentStats = accessor.getProperty(keyBuilder.persistantStat());
-    _alerts = accessor.getProperty(keyBuilder.alerts());
-    _alertStatus = accessor.getProperty(keyBuilder.alertStatus());
-
-    return true;
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
deleted file mode 100644
index 859c1d0..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadHealthDataStage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
-
-public class ReadHealthDataStage extends AbstractBaseStage {
-  HealthDataCache _cache;
-
-  public ReadHealthDataStage() {
-    _cache = new HealthDataCache();
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    if (manager == null) {
-      throw new StageException("HelixManager attribute value is null");
-    }
-    // DataAccessor dataAccessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    _cache.refresh(accessor);
-
-    event.addAttribute("HealthDataCache", _cache);
-
-    long processLatency = System.currentTimeMillis() - startTime;
-    addLatencyToMonitor(event, processLatency);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
deleted file mode 100644
index c48f156..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/StatsAggregationStage.java
+++ /dev/null
@@ -1,399 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.alerts.AlertParser;
-import org.apache.helix.alerts.AlertProcessor;
-import org.apache.helix.alerts.AlertValueAndStatus;
-import org.apache.helix.alerts.AlertsHolder;
-import org.apache.helix.alerts.ExpressionParser;
-import org.apache.helix.alerts.StatsHolder;
-import org.apache.helix.alerts.Tuple;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.healthcheck.StatHealthReportProvider;
-import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
-import org.apache.helix.model.AlertHistory;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.PersistentStats;
-import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
-import org.apache.log4j.Logger;
-
-/**
- * For each LiveInstances select currentState and message whose sessionId matches
- * sessionId from LiveInstance Get Partition,State for all the resources computed in
- * previous State [ResourceComputationStage]
- */
-public class StatsAggregationStage extends AbstractBaseStage {
-
-  public static final int ALERT_HISTORY_SIZE = 30;
-
-  private static final Logger logger = Logger.getLogger(StatsAggregationStage.class.getName());
-
-  StatsHolder _statsHolder = null;
-  AlertsHolder _alertsHolder = null;
-  Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
-  Map<String, Tuple<String>> _statStatus;
-  ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
-  Map<String, String> _alertActionTaken = new HashMap<String, String>();
-
-  public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
-  public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
-  public final String REPORT_NAME = "AggStats";
-  // public final String DEFAULT_AGG_TYPE = "decay";
-  // public final String DEFAULT_DECAY_PARAM = "0.1";
-  // public final String DEFAULT_AGG_TYPE = "window";
-  // public final String DEFAULT_DECAY_PARAM = "5";
-
-  public StatHealthReportProvider _aggStatsProvider;
-
-  // public AggregationType _defaultAggType;
-
-  public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus() {
-    return _alertStatus;
-  }
-
-  public Map<String, Tuple<String>> getStatStatus() {
-    return _statStatus;
-  }
-
-  public void persistAggStats(HelixManager manager) {
-    Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
-    Map<String, Map<String, String>> partitionReport =
-        _aggStatsProvider.getRecentPartitionHealthReport();
-    ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
-    if (report != null) {
-      record.setSimpleFields(report);
-    }
-    if (partitionReport != null) {
-      record.setMapFields(partitionReport);
-    }
-
-    // DataAccessor accessor = manager.getDataAccessor();
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    // boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
-    Builder keyBuilder = accessor.keyBuilder();
-    boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
-    if (retVal == false) {
-      logger.error("attempt to persist derived stats failed");
-    }
-  }
-
-  @Override
-  public void init(StageContext context) {
-  }
-
-  public String getAgeStatName(String instance) {
-    return instance + ExpressionParser.statFieldDelim + "reportingage";
-  }
-
-  // currTime in seconds
-  public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime) {
-    String statName = getAgeStatName(instance.getInstanceName());
-    long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
-                                                 // seconds
-    Map<String, String> ageStatMap = new HashMap<String, String>();
-    ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
-    ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
-    // note that applyStat will only work if alert already added
-    _statsHolder.applyStat(statName, ageStatMap);
-  }
-
-  @Override
-  public void process(ClusterEvent event) throws Exception {
-    long startTime = System.currentTimeMillis();
-    // String aggTypeName =
-    // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
-    // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
-
-    HelixManager manager = event.getAttribute("helixmanager");
-    HealthDataCache cache = event.getAttribute("HealthDataCache");
-
-    if (manager == null || cache == null) {
-      throw new StageException("helixmanager|HealthDataCache attribute value is null");
-    }
-    if (_alertsHolder == null) {
-      _statsHolder = new StatsHolder(manager, cache);
-      _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
-    } else {
-      _statsHolder.updateCache(cache);
-      _alertsHolder.updateCache(cache);
-    }
-    if (_statsHolder.getStatsList().size() == 0) {
-      if (logger.isTraceEnabled()) {
-        logger.trace("stat holder is empty");
-      }
-      return;
-    }
-
-    // init agg stats from cache
-    // initAggStats(cache);
-
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-
-    long currTime = System.currentTimeMillis();
-    // for each live node, read node's stats
-    long readInstancesStart = System.currentTimeMillis();
-    for (LiveInstance instance : liveInstances.values()) {
-      String instanceName = instance.getInstanceName();
-      logger.debug("instanceName: " + instanceName);
-      // XXX: now have map of HealthStats, so no need to traverse them...verify
-      // correctness
-      Map<String, HealthStat> stats;
-      stats = cache.getHealthStats(instanceName);
-      // find participants stats
-      long modTime = -1;
-      // TODO: get healthreport child node modified time and reportAgeStat based on that
-      boolean reportedAge = false;
-      for (HealthStat participantStat : stats.values()) {
-        if (participantStat != null && !reportedAge) {
-          // generate and report stats for how old this node's report is
-          modTime = participantStat.getLastModifiedTimeStamp();
-          reportAgeStat(instance, modTime, currTime);
-          reportedAge = true;
-        }
-        // System.out.println(modTime);
-        // XXX: need to convert participantStat to a better format
-        // need to get instanceName in here
-
-        if (participantStat != null) {
-          // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
-          // REPORT LEVEL TS
-          Map<String, Map<String, String>> statMap = participantStat.getHealthFields(instanceName);
-          for (String key : statMap.keySet()) {
-            _statsHolder.applyStat(key, statMap.get(key));
-          }
-        }
-      }
-    }
-    // Call _statsHolder.persistStats() once per pipeline. This will
-    // write the updated persisted stats into zookeeper
-    _statsHolder.persistStats();
-    logger.info("Done processing stats: " + (System.currentTimeMillis() - readInstancesStart));
-    // populate _statStatus
-    _statStatus = _statsHolder.getStatsMap();
-
-    for (String statKey : _statStatus.keySet()) {
-      logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
-    }
-
-    long alertExecuteStartTime = System.currentTimeMillis();
-    // execute alerts, populate _alertStatus
-    _alertStatus =
-        AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(), _statsHolder.getStatsList());
-    logger.info("done executing alerts: " + (System.currentTimeMillis() - alertExecuteStartTime));
-    for (String originAlertName : _alertStatus.keySet()) {
-      _alertBeanCollection.setAlerts(originAlertName, _alertStatus.get(originAlertName),
-          manager.getClusterName());
-    }
-
-    executeAlertActions(manager);
-    // Write alert fire history to zookeeper
-    updateAlertHistory(manager);
-    long writeAlertStartTime = System.currentTimeMillis();
-    // write out alert status (to zk)
-    _alertsHolder.addAlertStatusSet(_alertStatus);
-    logger.info("done writing alerts: " + (System.currentTimeMillis() - writeAlertStartTime));
-
-    // TODO: access the 2 status variables from somewhere to populate graphs
-
-    long logAlertStartTime = System.currentTimeMillis();
-    // logging alert status
-    for (String alertOuterKey : _alertStatus.keySet()) {
-      logger.debug("Alert Outer Key: " + alertOuterKey);
-      Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
-      if (alertInnerMap == null) {
-        logger.debug(alertOuterKey + " has no alerts to report.");
-        continue;
-      }
-      for (String alertInnerKey : alertInnerMap.keySet()) {
-        logger.debug("  " + alertInnerKey + " value: "
-            + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
-            + alertInnerMap.get(alertInnerKey).isFired());
-      }
-    }
-
-    logger.info("done logging alerts: " + (System.currentTimeMillis() - logAlertStartTime));
-
-    long processLatency = System.currentTimeMillis() - startTime;
-    addLatencyToMonitor(event, processLatency);
-    logger.info("process end: " + processLatency);
-  }
-
-  /**
-   * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
-   * has been fired
-   */
-
-  void executeAlertActions(HelixManager manager) {
-    _alertActionTaken.clear();
-    // Go through the original alert strings
-    for (String originAlertName : _alertStatus.keySet()) {
-      Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
-      if (alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME)) {
-        String actionValue = alertFields.get(AlertParser.ACTION_NAME);
-        Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
-        if (alertResultMap == null) {
-          logger.info("Alert " + originAlertName + " does not have alert status map");
-          continue;
-        }
-        // For each original alert, iterate all actual alerts that it expands into
-        for (String actualStatName : alertResultMap.keySet()) {
-          // if the actual alert is fired, execute the action
-          if (alertResultMap.get(actualStatName).isFired()) {
-            logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by "
-                + actualStatName);
-            _alertActionTaken.put(actualStatName, actionValue);
-            // move functionalities into a seperate class
-            executeAlertAction(actualStatName, actionValue, manager);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Execute the action if an alert is fired, and the alert has an action associated with it.
-   * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
-   */
-  void executeAlertAction(String actualStatName, String actionValue, HelixManager manager) {
-    if (actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString())) {
-      String instanceName = parseInstanceName(actualStatName, manager);
-      if (instanceName != null) {
-        logger.info("Disabling instance " + instanceName);
-        manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName,
-            false);
-      }
-    } else if (actionValue.equals(ActionOnError.DISABLE_PARTITION.toString())) {
-      String instanceName = parseInstanceName(actualStatName, manager);
-      String resourceName = parseResourceName(actualStatName, manager);
-      String partitionName = parsePartitionName(actualStatName, manager);
-      if (instanceName != null && resourceName != null && partitionName != null) {
-        logger.info("Disabling partition " + partitionName + " instanceName " + instanceName);
-        manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(),
-            instanceName, resourceName, Arrays.asList(partitionName));
-      }
-    } else if (actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString())) {
-      String instanceName = parseInstanceName(actualStatName, manager);
-      String resourceName = parseResourceName(actualStatName, manager);
-      logger.info("Disabling resource " + resourceName + " instanceName " + instanceName
-          + " not implemented");
-
-    }
-  }
-
-  public static String parseResourceName(String actualStatName, HelixManager manager) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder kb = accessor.keyBuilder();
-    List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
-    for (IdealState idealState : idealStates) {
-      String resourceName = idealState.getResourceId().stringify();
-      if (actualStatName.contains("=" + resourceName + ".")
-          || actualStatName.contains("=" + resourceName + ";")) {
-        return resourceName;
-      }
-    }
-    return null;
-  }
-
-  public static String parsePartitionName(String actualStatName, HelixManager manager) {
-    String resourceName = parseResourceName(actualStatName, manager);
-    if (resourceName != null) {
-      String partitionKey = "=" + resourceName + "_";
-      if (actualStatName.contains(partitionKey)) {
-        int pos = actualStatName.indexOf(partitionKey);
-        int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
-        int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
-        if (nextCommaPos > 0 && nextCommaPos < nextDotPos) {
-          nextDotPos = nextCommaPos;
-        }
-
-        String partitionName = actualStatName.substring(pos + 1, nextDotPos);
-        return partitionName;
-      }
-    }
-    return null;
-  }
-
-  public static String parseInstanceName(String actualStatName, HelixManager manager) {
-    HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    Builder kb = accessor.keyBuilder();
-    List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
-    for (LiveInstance instance : liveInstances) {
-      String instanceName = instance.getInstanceName();
-      if (actualStatName.startsWith(instanceName)) {
-        return instanceName;
-      }
-    }
-    return null;
-  }
-
-  void updateAlertHistory(HelixManager manager) {
-    // Write alert fire history to zookeeper
-    _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
-    Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
-    // Update history only when some beans has changed
-    if (delta.size() > 0) {
-      delta.putAll(_alertActionTaken);
-      SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
-      String date = dateFormat.format(new Date());
-
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
-      ZNRecord alertFiredHistory;
-      if (property == null) {
-        alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
-      } else {
-        alertFiredHistory = property.getRecord();
-      }
-      while (alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE) {
-        // ZNRecord uses TreeMap which is sorted ascending internally
-        String firstKey = (String) (alertFiredHistory.getMapFields().keySet().toArray()[0]);
-        alertFiredHistory.getMapFields().remove(firstKey);
-      }
-      alertFiredHistory.setMapField(date, delta);
-      // manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
-      accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
-      _alertBeanCollection.setAlertHistory(alertFiredHistory);
-    }
-  }
-
-  public ClusterAlertMBeanCollection getClusterAlertMBeanCollection() {
-    return _alertBeanCollection;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
deleted file mode 100644
index a3c443f..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/AccumulateAggregationType.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.log4j.Logger;
-
-public class AccumulateAggregationType implements AggregationType {
-
-  private static final Logger logger = Logger.getLogger(AccumulateAggregationType.class);
-
-  public final static String TYPE_NAME = "accumulate";
-
-  @Override
-  public String getName() {
-    return TYPE_NAME;
-  }
-
-  @Override
-  public String merge(String iv, String ev, long prevTimestamp) {
-    double inVal = Double.parseDouble(iv);
-    double existingVal = Double.parseDouble(ev);
-    return String.valueOf(inVal + existingVal);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/db4c10a2/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java b/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
deleted file mode 100644
index 29f5921..0000000
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/AggregationType.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.helix.healthcheck;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public interface AggregationType {
-
-  // public abstract <T extends Object> T merge(T iv, T ev);
-
-  public final static String DELIM = "#";
-
-  public abstract String merge(String incomingVal, String existingVal, long prevTimestamp);
-
-  public abstract String getName();
-}


Mime
View raw message